Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3138

SchemaURL from flume configuration is dropping the flume events expecting the schema url to be added in event header as against FLUME-2810

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.7.0
    • None
    • Configuration
    • None
    • Flume1.7

    Description

      I have avro data coming to kafka topic. Flume reads the events from kafka and then using kite dataset with hdfs sink is put into HDFS as parquet data.
      Flume config is as below:
      agent.sinks.k1.channel = c1
      agent.sinks.k1.type = org.apache.flume.sink.kite.DatasetSink
      agent.sinks.k1.kite.dataset.uri = dataset:hdfs://namenodeHA/kite/avro_to_parquet_item2
      agent.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
      agent.sinks.k1.hdfs.filePrefix=parquetdata
      agent.sinks.k1.hdfs.fileSuffix = .parquet
      agent.sinks.k1.hdfs.fileType=DataStream
      #agent.sinks.k1.hdfs.rollInterval=30
      #agent.sinks.k1.hdfs.rollCount=1
      #agent.sinks.k1.hdfs.batchSize=1
      agent.sinks.k1.kite.batchSize=2
      agent.sinks.k1.kite.rollInterval=30
      agent.sinks.k1.kite.flushable.commitOnBatch=true
      #agent.sinks.k1.hdfs.path = hdfs://namenodeHA/user/flumetest
      #agent.sinks.k1.serializer.compressionCodec = snappy
      agent.sinks.k1.serializer.schemaURL = hdfs://namenodeHA/kite/item.avsc

      I am getting the below exception in the flume logs:
      2017-07-31 06:18:40,796 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:153)] Got brand-new compressor [.snappy]
      2017-07-31 06:18:40,802 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.kitesdk.data.spi.filesystem.FileSystemWriter.initialize(FileSystemWriter.java:147)] Opened output appender ParquetAppender{path=hdfs://namenodeHA/kite/avro_to_parquet_item2/.6d1019b3-96c4-4334-b737-af260d17aac4.parquet.tmp, schema={"type":"record","name":"item","namespace":"item.avro","fields":[{"name":"i_item_sk","type":..................................

      {"name":"i_manager_id","type":["null","int"]}

      ,

      {"name":"i_product_name","type":["null","string"]}

      ]}, fileSystem=DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_2077692400_17, ugi=root (auth:SIMPLE)]], avroParquetWriter=parquet.avro.AvroParquetWriter@31ffba30} for hdfs://namenodeHA/kite/avro_to_parquet_item2/6d1019b3-96c4-4334-b737-af260d17aac4.parquet
      2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:39)] Event delivery failed: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal
      2017-07-31 06:18:40,803 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
      org.apache.flume.EventDeliveryException: org.apache.flume.sink.kite.NonRecoverableEventException: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal
      at org.apache.flume.sink.kite.policy.RetryPolicy.handle(RetryPolicy.java:42)
      at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:375)
      at org.apache.flume.sink.kite.DatasetSink.process(DatasetSink.java:301)
      at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
      at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.flume.sink.kite.NonRecoverableEventException: No schema in event headers. Headers must include either flume.avro.schema.url or flume.avro.schema.literal
      at org.apache.flume.sink.kite.parser.AvroParser.schema(AvroParser.java:185)
      at org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:155)
      at org.apache.flume.sink.kite.parser.AvroParser.parse(AvroParser.java:56)
      at org.apache.flume.sink.kite.DatasetSink.write(DatasetSink.java:366)

      Attachments

        Activity

          People

            Unassigned Unassigned
            ramgopalnaali Ramgopal N
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: