Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.7.0
-
None
-
None
-
Flume1.7
Description
I have avro data coming to kafka topic. Flume reads the events from kafka and then using kite dataset with hdfs sink is put into HDFS as parquet data.
Flume config is as below:
agent.sinks.k1.channel = c1
agent.sinks.k1.type = org.apache.flume.sink.kite.DatasetSink
agent.sinks.k1.kite.dataset.uri = dataset:hdfs://namenodeHA/kite/avro_to_parquet_item2
agent.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
agent.sinks.k1.hdfs.filePrefix=parquetdata
agent.sinks.k1.hdfs.fileSuffix = .parquet
agent.sinks.k1.hdfs.fileType=DataStream
#agent.sinks.k1.hdfs.rollInterval=30
#agent.sinks.k1.hdfs.rollCount=1
#agent.sinks.k1.hdfs.batchSize=1
agent.sinks.k1.kite.batchSize=2
agent.sinks.k1.kite.rollInterval=30
agent.sinks.k1.kite.flushable.commitOnBatch=true
#agent.sinks.k1.hdfs.path = hdfs://namenodeHA/user/flumetest
#agent.sinks.k1.serializer.compressionCodec = snappy
agent.sinks.k1.serializer.schemaURL = hdfs://namenodeHA/kite/item.avsc
I am getting the below exception in the flume logs:
2017-07-31 06:18:40,796 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:153)] Got brand-new compressor [.snappy]
2017-07-31 06:18:40,802 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.kitesdk.data.spi.filesystem.FileSystemWriter.initialize(FileSystemWriter.java:147)] Opened output appender ParquetAppender{path=hdfs://namenodeHA/kite/avro_to_parquet_item2/.6d1019b3-96c4-4334-b737-af260d17aac4.parquet.tmp, schema={"type":"record","name":"item","namespace":"item.avro","fields":[{"name":"i_item_sk","type":..................................
,
{"name":"i_product_name","type":["null","string"]}]}, fileSystem=DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_2077692400_17, ugi=root (auth:SIMPLE)]], avroParquetWriter=parquet.avro.AvroParquetWriter@31ffba30} for hdfs://namenodeHA/kite/avro_to_parquet_item2/6d1019b3-96c4-4334-b737-af260d17aac4.parquet
2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:39)] Event delivery failed: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal
2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: org.apache.flume.sink.kite.NonRecoverableEventException: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal
at org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:42)
at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:375)
at org.apache.flume.sink.kite.DatasetSink.process(DatasetSink.java:301)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.sink.kite.NonRecoverableEventException: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal
at org.apache.flume.sink.kite.parser.AvroParser.schema(AvroParser.java:185)
at org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:155)
at org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:56)
at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:366)