Questions tagged [apache-kafka]

Apache Kafka is a distributed streaming platform designed to store and process high-throughput data streams.

0
votes
0answers
9 views

Kafka Consumer Hangs Indefinitely after Rebalancing

I am trying to utilize a kafka consumer library that is prewritten in my organization. It takes JSON data from a Kafka topic and stores it in a Mongo database. While I cannot post this code, it is a ...
0
votes
0answers
7 views

Preventing KStream from emiting old unchanged aggregate value

I have a KStream pipeline which groups by key and then windows on some interval and then applies a custom aggregation on that: KStream<String, Integer> input = /* define input stream */ /* ...
0
votes
0answers
12 views

Supress aggregation until custom condition

I am using Kafka DSL. How would I proceed to suppress the output of an aggregation (similar behavior to this) with a custom condition? Let's say for every key I may have a START and a STOP event. I ...
0
votes
0answers
10 views

Micronaut configuration for multiple Kafka clusters

I'm working on a Micronaut application which have to connect to two different kafka clusters: - the first one has my source topics, - the second one is where I have to put the data produced by the ...
0
votes
0answers
12 views

Magic v1 does not support record headers

I am trying to produce a message to Kafka. I am using Kafka-client with version 1.1.0 and camel-version: 2.22.1 but every time getting an error saying Magic v1 does not support record headers ...
1
vote
0answers
13 views

Quarkus Kafka consumer doesn't work in native mode

I have a Quarkus Kafka consumer. In VM mode it works well. After I build native runner with : ./mvnw package -Pnative when I run it in native mode I have this exception : 2019-05-23 17:17:42,...
1
vote
1answer
18 views

Kafka logs deleted even after setting log.retention.ms = -1

I'm running a hyperledger fabric 1.2.0 network with 5 Kafkas and 3 Zookeepers. The issue I'm facing is even after setting log.retention.ms = -1, Kafka deleted few initial logs. To my knowledge, after ...
1
vote
1answer
33 views

How can I use “MQTT protocol with Kafka as a broker”?

I want to implement a chat application which uses "MQTT protocol" in order to send messages from the device(Android phone). I need a "Kafka broker" which would run on the server and listen to these ...
0
votes
0answers
8 views

Mirror data in Kafka 0.8.2.1 cluster to Kafka 2.2.0 cluster

I wanted use Apache Spark Structured Streaming along with Kafka, Spark Structured Streaming Supports Kafka 0.10 and above and my Kafka cluster uses kafka version 0.8.2.1 . I want to replicate some of ...
0
votes
1answer
17 views

Real time data integration how does Kafka, Hadoop, Avro and HDFS fit together and what kind of architectures is there for data integration

I am trying to understand what architectures there is for real time data integration and how all the pieces fit together. I have tried to research on the internet but I could not find good resources. ...
0
votes
0answers
13 views

Too many open files on a project using spring-cloud stream and kafka after upgrade to 2.1

After upgrade to spring cloud stream 2.1 on a project using multi binder kafka(2 brokers) and rabbit(1 broker) we are facing too many open files problem. The number of files opened keeps growing to ...
1
vote
0answers
12 views

How to configure Fluentd configuration file for sending logs to Kafka?

I've been working to create a syslog application in which I have to collect different kinds of logs generated using Fluentd and send them to Cassandra for analysis later on. I have to use Kafka as a ...
0
votes
0answers
16 views

Can we use multiple queues for all the topics, like one dedicated queue to each topic in Apache Kafka?

I want to make a dedicated queue to for each topic in Kafka. Is that possible or feasible?
2
votes
0answers
29 views

Spark Structured Streaming OutOfMemoryError caused by thousands of KafkaMbean instances

Spark Structured Streaming executor fails with OutOfMemoryError Checking the heap allocation with VirtualVM indicate that JMX Mbean Server memory usage grows linearly with time. After a further ...
0
votes
0answers
13 views

Consume pushed kafka data in a dataframe

I am consuming data from Kafka which I am successfully able to do but now I want to perform some operation on it by saving the data in a data frame. I am new to Kafka and don't know how to go about ...
0
votes
0answers
24 views

KafkaStreams KTable compaction

In my application almost every incoming records value is nulled out(tomb-stoned) within a few seconds, however I don't see compaction happening as RocksDB size is not reducing and it still has ...
1
vote
1answer
23 views

How to make GetKafka always read from latest offset in Apache Nifi?

I'm using GetKafka of Apache Nifi to read message. When I restart the processor, I'd like to make it always read from the latest off-set, instead the off-set this group committed. How can I achieve ...
1
vote
0answers
25 views

How to join three DStreams in Spark streaming using Python

I have three kafka producers which are sending data streams on the same topic at random intervals between 5-10 seconds. There is a Spark consumer (python based ) which is consuming the data. The ...
2
votes
2answers
53 views

Is it possible to have one Kafka consumer thread per topic?

We have Springboot application that uses Spring-Kafka (2.1.7). We have enabled concurrency, so we can have one consumer thread per partition. So currently, if we have 3 topics, each with 2 partitions, ...
0
votes
1answer
35 views

Encryption, authentication and external access for Confluent Kafka on Kubernetes

Am trying to configure Encryption, authentication and external access for Confluent Kafka on Kubernetes. helm chart https://github.com/confluentinc/cp-helm-charts. Document following : https://...
1
vote
1answer
30 views

Reading a CSV file line by line and producing streams with some lines sent with a random delay

I am reading concurrently 3 CSV file line by line and sending them to a message queue (Apache Kafka). The data rows are ordered by increasing timestamp values. I am simulating the streams by looking ...
1
vote
0answers
23 views

Confluent - Splitting Avro messages from one kafka topic into multiple kafka topics

We have an incoming kafka topic with multiple Avro schema based messages serialized into it. We need to split the messages in Avro format into multiple other kafka topics based on certain value of a ...
0
votes
1answer
31 views

I'm having hard time setting up kafka on gke and would like to know the best way of setting it up?

I was trying to use statefulset to deploy the zookeeper and Kafka server in a cluster in gke but the communication between the Kafka and zookeeper fails with an error message in logs. I'd like to know ...
1
vote
0answers
19 views

Kafka aggregation: Issue when grouping key changes

We are using kafka KTable for aggregation and below is the kind of data we receive in input Input data - Transaction Detail (transaction Id, status, category, amount,.. ) We group the above based on ...
-1
votes
0answers
19 views

KafkaAvroDeserializer is not an instance of Deserializer

The code below in java throws exceptions as shown at the bottom. Environment : Tomcat, Java8 KafkaConsumer <String, Map<ConfidentialityLevel, CommonCoreEvent>> consumer; Properties props ...
0
votes
1answer
26 views

What is the impact of acknowledgement modes in asynchronous send in kafka?

Does acknowledgement modes ('0','1','all') have any impact, when we send messages using asynchronous send() call? I have tried measuring the send call latency (that is, by recording time before and ...
1
vote
0answers
15 views

How to setup proxy layer on kafka broker?

I am trying to setup a proxy server (preferred in java) that can pass my connection stream to kafka broker. Client (Consumer/Producer) <--> [PROXY SERVER] <--> Kafka Broker My use case ...
0
votes
0answers
17 views

Apache Spark Temp files Size on DIsk

I have a setup where the incoming data from Kafka cluster is processed by Apache Spark streaming job. Version Info :- Kafka = 0.8.x Spark Version = 2.3.1 Recently when the capacity of Kafka cluster ...
0
votes
1answer
30 views

How to schedule kafka consumer properly in apache nifi?

I'm trying to use one consumer continuously read data from kafka. How should I set the scheduling options? I have read the User Guide, but I can not figure out how to set the run schedule and run ...
0
votes
0answers
14 views

Configure log4j.properties for Kafka appender, error when parsing property bootstrap.servers

I want to add a Kafka appender to the audit-hdfs log in a Cloudera cluster. I have successfully configured a log4j2.xml file with a Kafka appender, I need to convert this log4j2.xml into a log4j2....
0
votes
0answers
17 views

kafka connect jdbc source setup is not reading data from db and so no data in kafka topic

we configured kafka connect jdbc to read data from db2 and publish to kafka topic and we are using one of the column of type timestamp as timestamp.column.name , but i see that kafka connect is not ...
2
votes
3answers
82 views

Stream join example with Apache Kafka?

I was looking for an example using Kafka Streams on how to do this sort of thing, i.e. join a customers table with a addresses table and sink the data to ES:- Customers +------+------------+---------...
2
votes
0answers
26 views

How to include both “latest” and “JSON with specific Offset” in “startingOffsets” while importing data from Kafka into Spark Structured Streaming

I have a streaming query saving data into filesink. I am using .option("startingOffsets", "latest") and a checkpoint location. If there is any down time on Spark and when the streaming query starts ...
1
vote
1answer
41 views

How to make lag go down in kafka stream based application

I am having a real env with 3 kafka machine cluster which is receiving lot of data. For each topic there are 25 partitions with replication factor set to 2. My application which (kafka stream based ...
1
vote
1answer
30 views

How can I run KafkaStream application with Micronaut?

From https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaStream, I can see there is an easy way of running an app which runs KStream. If I don't want to use KStream, but want to ...
0
votes
2answers
32 views

Possible Kafka streaming without CDC?

I'm working on a project at work. I'm curious if it is possible to implement Kafka on a database without CDC. (CDC isn't turned on all the time on it) Can't find any resources online that discuss ...
0
votes
1answer
27 views

Understanding max.task.idle.ms in Kafka Stream for a KStream-KTable join

I need help understanding Kafka stream behavior when max.task.idle.ms is used in Kafka 2.2. I have a KStream-KTable join where the KStream has been re-keyed: KStream stream1 = builder.stream("topic1"...
0
votes
1answer
18 views

Install Custom Connector To Kafka Connect on Kubernetes

I'm running the kafka kubenetes helm deployment, however I am unsure about how to install a custom plugin. When running custom plugin on my local version of kafka I mount the volume /myplugin to the ...
0
votes
1answer
22 views

what is the procedure when kafka disk crashed + how to remove the old topics after disk crashed

we have 3 kafka machine in the HDP cluster kafka01 kafka02 kafka03 kafka version - 0.10.0.2.6 each kafka machine ( kafka03 ) have disk with 18T and default.replication.factor=3 the last ...
1
vote
1answer
18 views

In the new Kafka Consumer API versions >0.9 what property replaces consumer.timeout.ms?

I cannot figure out from the documentation which property now replaces consumer.timeout.ms =-1 (Default) which was available in Kafka Consumer API versions before 0.9. Does anyone have the link to the ...
1
vote
2answers
34 views

How can I get the broker that a topic belongs to in Java

I have a cluster of kafka brokers. I am trying to get metrics at the topic level, which I have done successfully for topics residing on the specific broker that the code points to. Zookeeper returns ...
0
votes
2answers
45 views

java kafka get message offset in partition by unique key

If all my kafka messages contains a unique key, what will be the fastest way to query the offset of a message with its unique key within a partition in the broker? Assume I already know this message ...
0
votes
1answer
18 views

serialize object for kafka in flink

I'm trying to use flink for read data from kafka do some function and return the result to different Kafka topic but getting the following error. `org.apache.flink.api.common.InvalidProgramException: ...
0
votes
0answers
23 views

How to fix Kafka-Spark streaming error in using JavaInputDStream for Direct Stream?

I was doing simple Kafka-Spark streaming using Direct Stream as done in https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html and I am compiling as a single java file (no maven) ...
0
votes
1answer
21 views

Kafka stream KTable changelog TTL

Let’s say I have an A-Event KStream aggregated into an A-Snapshot KTable and a B-Event KStream aggregated into a B-Snapshot KTable. Neither A-Snapshot nor B-Snapshot conveys null values (delete events ...
0
votes
0answers
30 views

Stream application dies due to “Failed to rebalance” and “Unexpected error in InitProducerIdResponse”

My kafka stream application is throwing the following exception and dies, after the kafka cluster ran out of disk space and died. The cluster is up and running properly now, but the stream app still ...
1
vote
0answers
12 views

How to perform masking with custom values or random values in apache kafka source connector and then unmasking the message in kafka sink connector?

{ transforms=SSNMask,IPMask,PhoneMask transforms.SSNMask.type=org.apache.kafka.connect.transforms.MaskField$Value transforms.SSNMask.fields=ssn } transforms should have some additional property to ...
0
votes
1answer
46 views

Publish Kafka objects in batches

I am using KafkaProducer.send() method to publish records to Kafka. This is an asynchronous method. My application is publishing about 20k records to Kafka. A lot of times after successfully sending ...
0
votes
1answer
21 views

How to send message key with unique identiffier (eventID/UUID/filename) from Nifi to Kafka topic and see in kafka logs

I am trying to send a message (json file) to Kafka topic (publish), and to use the filename or the uuid as a kafka key so I could trace it in the logs. the message is sent and consumed later on but I ...
1
vote
2answers
24 views

How to check Kafka server status or details?

Is there a command to show the details of Kafka server or the status of Kafka server? (I am not trying to find out if the kafka server is running.) I can only find information on topic, partition, ...