Details
-
Question
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.6.0
-
None
-
None
-
Redhat
Description
I am attempting to pull data from a Confluent(Kafka) stream where the messages are snappy compressed avro byte[] and push to an HDFS file containing the avro data with schema.
First attempt resulted in pulling the data and creating files with serialized avro data but no schema at the top of the file:
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = zk:2181
tier1.sources.source1.topic = mytopicX
tier1.sources.source1.groupId = mygroupid
tier1.sources.source1.batchSize = 100
tier1.sources.source1.batchDurationMillis = 1000
tier1.sources.source1.kafka.consumer.timeout.ms = 100
tier1.sources.source1.auto.commit.enable=false
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 10000
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /some/where/useful
tier1.sinks.sink1.hdfs.rollInterval = 300
tier1.sinks.sink1.hdfs.rollSize = 134217728
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.writeFormat = Text
tier1.sinks.sink1.channel = channel1
<try many different configs and finally create custom serializer>
Nth attempt I was able to create the file with the serialized schema at the top of the file but no data in the file. It had errors in the logs indicating the the schema I provided has not able to union with the data messages, the only difference was in the sink config -
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = /some/where/useful
tier1.sinks.sink1.hdfs.rollInterval = 300
tier1.sinks.sink1.hdfs.rollSize = 134217728
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.writeFormat = Text
tier1.sinks.sink1.serializer = MySerializer$Builder # loaded to the flume machine
tier1.sinks.sink1.channel = channel1
and the error given in the logs with this configuration:
Caused by: org.apache.avro.UnresolvedUnionException: Not in union [ my very large schema goes here ]: [Event headers =
, body.length = 383 ]
Is there a config or method of getting your schema to be part of the output HDFS file that I am missing. With respect to the custom serializer I am simply extending the AbstractAvroEventSerializer<Event> and really just creating the schema for my data.
Any help, docs or recommendations would much appreciated as I am stuck on this.