Uploaded image for project: 'Flume'
  1. Flume
  2. FLUME-2760

Flafka configuration seems unclear and I get either serialized data for serialized schema in HDFS file.

    XMLWordPrintableJSON

Details

    • Question
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.6.0
    • None
    • Sinks+Sources
    • None
    • Redhat

    Description

      I am attempting to pull data from a Confluent(Kafka) stream where the messages are snappy compressed avro byte[] and push to an HDFS file containing the avro data with schema.

      First attempt resulted in pulling the data and creating files with serialized avro data but no schema at the top of the file:
      tier1.sources = source1
      tier1.channels = channel1
      tier1.sinks = sink1

      tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
      tier1.sources.source1.zookeeperConnect = zk:2181
      tier1.sources.source1.topic = mytopicX
      tier1.sources.source1.groupId = mygroupid
      tier1.sources.source1.batchSize = 100
      tier1.sources.source1.batchDurationMillis = 1000
      tier1.sources.source1.kafka.consumer.timeout.ms = 100
      tier1.sources.source1.auto.commit.enable=false
      tier1.sources.source1.channels = channel1

      tier1.channels.channel1.type = memory
      tier1.channels.channel1.capacity = 10000
      tier1.channels.channel1.transactionCapacity = 10000

      tier1.sinks.sink1.type = hdfs
      tier1.sinks.sink1.hdfs.path = /some/where/useful
      tier1.sinks.sink1.hdfs.rollInterval = 300
      tier1.sinks.sink1.hdfs.rollSize = 134217728
      tier1.sinks.sink1.hdfs.rollCount = 0
      tier1.sinks.sink1.hdfs.fileType = DataStream
      tier1.sinks.sink1.hdfs.writeFormat = Text
      tier1.sinks.sink1.channel = channel1

      <try many different configs and finally create custom serializer>

      Nth attempt I was able to create the file with the serialized schema at the top of the file but no data in the file. It had errors in the logs indicating the the schema I provided has not able to union with the data messages, the only difference was in the sink config -

      tier1.sinks.sink1.type = hdfs
      tier1.sinks.sink1.hdfs.path = /some/where/useful
      tier1.sinks.sink1.hdfs.rollInterval = 300
      tier1.sinks.sink1.hdfs.rollSize = 134217728
      tier1.sinks.sink1.hdfs.rollCount = 0
      tier1.sinks.sink1.hdfs.fileType = DataStream
      tier1.sinks.sink1.hdfs.writeFormat = Text
      tier1.sinks.sink1.serializer = MySerializer$Builder # loaded to the flume machine
      tier1.sinks.sink1.channel = channel1

      and the error given in the logs with this configuration:
      Caused by: org.apache.avro.UnresolvedUnionException: Not in union [ my very large schema goes here ]: [Event headers =

      {topic=mytopicX, key=df84dcd6-a801-477c-a0d8-aa7e526b672d, timestamp=1438883746194}

      , body.length = 383 ]

      Is there a config or method of getting your schema to be part of the output HDFS file that I am missing. With respect to the custom serializer I am simply extending the AbstractAvroEventSerializer<Event> and really just creating the schema for my data.

      Any help, docs or recommendations would much appreciated as I am stuck on this.

      Attachments

        Activity

          People

            Unassigned Unassigned
            mqchrisw Chris Weaver
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: