Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
1.6.2
-
None
Description
I created Specific class for Kafka topic.
Avro schema includes logicalTypes.
Then I want to read data using following code:
val deserializationSchema = ConfluentRegistryAvroDeserializationSchema.forSpecific(classOf[mySpecificClass], schemaRegistryUrl) val kafkaStream = env.addSource( new FlinkKafkaConsumer011(topic, deserializationSchema, kafkaProperties) ) kafkaStream.print()
Result:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:645) at TransactionEnrichment$.main(TransactionEnrichment.scala:50) at TransactionEnrichment.main(TransactionEnrichment.scala) Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime at platform_tbl_game_transactions_v1.Value.put(Value.java:222) at org.apache.avro.generic.GenericData.setField(GenericData.java:690) at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74) at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748)
When using Kafka Consumer there was a hack for this to use LogicalConverters.
Unfortunately it's not working in flink.
SpecificData.get.addLogicalTypeConversion(new TimeConversions.TimestampConversion)
Problem probably is cause by the fact we're creating own instance of SpecificData
https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L145
And there is no logical conversions added.
Attachments
Issue Links
- links to