Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-3047

Avro Sink HDFS with org.apache.flume.sink.hdfs.AvroEventSerializer$Builder not working

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Blocker
    • Resolution: Unresolved
    • 1.7.0
    • None
    • Client SDK, Sinks+Sources
    • None

    Description

      For the below configuration, avro messages for Kafka topic are pulled and written into HDFS succesfully. But when trying to deserialize using AvroTools.jar (java -jar avro-tools-1.8.1.jar tojson FlumeData.1484909338012 > flume) gives an exception.
      Please find below Flume conf and avro related files

      agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
      agent1.sources.kafka-source.zookeeperConnect = machinemae:2181
      agent1.sources.kafka-source.topic = unverified
      agent1.sources.kafka-source.groupId = flume
      agent1.sources.kafka-source.channels = memory-channel
      agent1.sources.kafka-source.interceptors = i1
      agent1.sources.kafka-source.interceptors.i1.type = timestamp
      agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100
      #agent1.sources.kafka-source.useFlumeEventFormat = true
      agent1.channels.memory-channel.type = memory
      agent1.channels.memory-channel.capacity = 10000
      agent1.channels.memory-channel.transactionCapacity = 1000

      agent1.sinks.hdfs-sink.type = hdfs
      agent1.sinks.hdfs-sink.hdfs.fileSuffix=.avro
      agent1.sinks.hdfs-sink.hdfs.path = /company/jar/source/gu33/s4/1.35/%

      {topic}

      /%y-%m-%d
      agent1.sinks.hdfs-sink.hdfs.rollInterval = 5
      agent1.sinks.hdfs-sink.hdfs.rollSize = 0
      agent1.sinks.hdfs-sink.hdfs.rollCount = 0
      agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
      #agent1.sinks.hdfs-sink.hdfs.writeFormat = Text
      agent1.sinks.hdfs-sink.channel = memory-channel
      #agent1.sinks.hdfs-sink.serializer = avro_event
      agent1.sinks.hdfs-sink.serializer.compressionCodec = snappy
      agent1.sinks.hdfs-sink.serializer=org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
      agent1.sinks.hdfs-sink.serializer.schemaURL = hdfs://machinemane:9000/ca/gu33.avsc

      agent1.sources = kafka-source
      agent1.channels = memory-channel
      agent1.sinks = hdfs-sink
      ############################

      with below avro messages and schema
      Avro Schema :
      {
      "type" : "record",
      "name" : “xmenHeader",
      "namespace" : "com.company.xmen”,
      "fields" : [ {
      "name" : "header",
      "type" : {
      "type" : "record",
      "name" : "header",
      "fields" : [

      { "name" : "tenant_id", "type" : [ "null", "string" ], "default" : "null" }

      ,

      { "name" : "doc_type_id", "type" : [ "null", "string" ], "default" : "null" }

      ,

      { "name" : "unique_id", "type" : [ "null", "string" ], "default" : "null" }

      ,

      { "name" : "doc_type_version", "type" : [ "null", "string" ], "default" : "null" }

      ,

      { "name" : "product_id", "type" : [ "null", "string" ], "default" : "null" }

      ]
      }
      }, {
      "name" : "body",
      "type" : {
      "type" : "record",
      "name" : "body",
      "fields" : [ {
      "name" : "name",
      "type" : [ "null", {
      "type" : "record",
      "name" : "name_name_0",
      "fields" : [

      { "name" : "app_id", "type" : [ "null", "string" ], "default" : "null" }

      ]
      } ],
      "default" : "null"
      } ]
      }
      } ]
      }

      Actual JSON message:
      {
      "header": {
      "product_id": "GU33",
      "tenant_id": "tenant_name",
      "doc_type_id": "s4",
      "doc_type_version": "1.35"
      },
      "body":
      {"name" : {"app_id":"testApp_ID"}}
      }

      Attachments

        Activity

          People

            Unassigned Unassigned
            contactsantoshb@gmail.com Santosh Balasubramanya
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: