Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24544

Failure when using Kafka connector in Table API with Avro and Confluent schema registry

    XMLWordPrintableJSON

Details

    Description

      A user reported in the mailing list that Avro deserialization fails when using Kafka, Avro and Confluent Schema Registry:

      Caused by: java.io.IOException: Failed to deserialize Avro record.
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)  
        at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
        at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) 
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
      Caused by: org.apache.avro.AvroTypeException: Found my.type.avro.MyEnumType, expecting union
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
        at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
        at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
        ... 9 more
      

      Look in the attachments for a reproducer.

      Same data serialized to a file works fine (look the filesystem example in the reproducer)

      Attachments

        1. flink-deser-avro-enum.zip
          14 kB
          Francesco Guardiani

        Activity

          People

            Unassigned Unassigned
            slinkydeveloper Francesco Guardiani
            Votes:
            1 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated: