Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.5.0
-
None
-
None
Description
I was testing a case when flume agent is reading from kafka source from topic 'sourcetopic' and sink configured to kafkasink but to other topic 'destinationtopic',
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.zookeeperConnect = localhost:2181
tier1.sources.source1.topic = sourcetopic
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = destinationtopic
tier1.sinks.sink1.brokerList = localhost:9092
tier1.sinks.sink1.channel = channel1
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
With this settings i noticed that event were not written to 'destinationtopic',
After debugging the agent if found that kafka source puts in topic name in header.
headers.put(KafkaSourceConstants.TOPIC, topic);
and in sink check is made to see if headers contain topic, if exists then we take topic name from header and write event that topic and there by discarding configured sink topic i ,e destinationtopic.
here is code snippet that does, even though variable topic as destinationtopic, since header had topic, kafka sink takes topic name from header and puts event to that topic i,e again to source topic
if ((eventTopic = headers.get(TOPIC_HDR)) == null)
{ eventTopic = topic; }Attachments
Attachments
Issue Links
- duplicates
-
FLUME-2789 Kafka sink sends messages to the source topic by mistake
- Open