Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11030

Cannot use Avro logical types with ConfluentRegistryAvroDeserializationSchema

    XMLWordPrintableJSON

Details

    Description

      I created Specific class for Kafka topic.
      Avro schema includes logicalTypes.
      Then I want to read data using following code:

      val deserializationSchema = ConfluentRegistryAvroDeserializationSchema.forSpecific(classOf[mySpecificClass], schemaRegistryUrl)
      val kafkaStream = env.addSource(
        new FlinkKafkaConsumer011(topic, deserializationSchema, kafkaProperties)
      )
      kafkaStream.print()
      

      Result:

       Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime
       at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
       at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
       at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
       at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:645)
       at TransactionEnrichment$.main(TransactionEnrichment.scala:50)
       at TransactionEnrichment.main(TransactionEnrichment.scala)
       Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime
       at platform_tbl_game_transactions_v1.Value.put(Value.java:222)
       at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
       at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
       at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
       at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
       at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
       at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
       at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
       at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
       at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:142)
       at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738)
       at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
       at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
       at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
       at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
       at java.lang.Thread.run(Thread.java:748)
       

      When using Kafka Consumer there was a hack for this to use LogicalConverters.
      Unfortunately it's not working in flink.

       SpecificData.get.addLogicalTypeConversion(new TimeConversions.TimestampConversion)
       

      Problem probably is cause by the fact we're creating own instance of SpecificData
      https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L145
      And there is no logical conversions added.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              maver1ck Maciej BryƄski
              Votes:
              1 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m