Details
-
Bug
-
Status: Open
-
Blocker
-
Resolution: Unresolved
-
1.7.0
-
None
-
None
Description
For the below configuration, avro messages for Kafka topic are pulled and written into HDFS succesfully. But when trying to deserialize using AvroTools.jar (java -jar avro-tools-1.8.1.jar tojson FlumeData.1484909338012 > flume) gives an exception.
Please find below Flume conf and avro related files
agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafka-source.zookeeperConnect = machinemae:2181
agent1.sources.kafka-source.topic = unverified
agent1.sources.kafka-source.groupId = flume
agent1.sources.kafka-source.channels = memory-channel
agent1.sources.kafka-source.interceptors = i1
agent1.sources.kafka-source.interceptors.i1.type = timestamp
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100
#agent1.sources.kafka-source.useFlumeEventFormat = true
agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 10000
agent1.channels.memory-channel.transactionCapacity = 1000
agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.fileSuffix=.avro
agent1.sinks.hdfs-sink.hdfs.path = /company/jar/source/gu33/s4/1.35/%
/%y-%m-%d
agent1.sinks.hdfs-sink.hdfs.rollInterval = 5
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 0
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
#agent1.sinks.hdfs-sink.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink.channel = memory-channel
#agent1.sinks.hdfs-sink.serializer = avro_event
agent1.sinks.hdfs-sink.serializer.compressionCodec = snappy
agent1.sinks.hdfs-sink.serializer=org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
agent1.sinks.hdfs-sink.serializer.schemaURL = hdfs://machinemane:9000/ca/gu33.avsc
agent1.sources = kafka-source
agent1.channels = memory-channel
agent1.sinks = hdfs-sink
############################
with below avro messages and schema
Avro Schema :
{
"type" : "record",
"name" : “xmenHeader",
"namespace" : "com.company.xmen”,
"fields" : [ {
"name" : "header",
"type" : {
"type" : "record",
"name" : "header",
"fields" : [
,
{ "name" : "doc_type_id", "type" : [ "null", "string" ], "default" : "null" },
{ "name" : "unique_id", "type" : [ "null", "string" ], "default" : "null" },
{ "name" : "doc_type_version", "type" : [ "null", "string" ], "default" : "null" },
{ "name" : "product_id", "type" : [ "null", "string" ], "default" : "null" } ]
}
}, {
"name" : "body",
"type" : {
"type" : "record",
"name" : "body",
"fields" : [ {
"name" : "name",
"type" : [ "null", {
"type" : "record",
"name" : "name_name_0",
"fields" : [
]
} ],
"default" : "null"
} ]
}
} ]
}
Actual JSON message:
{
"header": {
"product_id": "GU33",
"tenant_id": "tenant_name",
"doc_type_id": "s4",
"doc_type_version": "1.35"
},
"body":
{"name" : {"app_id":"testApp_ID"}}
}