Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24544

Failure when using Kafka connector in Table API with Avro and Confluent schema registry

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      A user reported in the mailing list that Avro deserialization fails when using Kafka, Avro and Confluent Schema Registry:

      Caused by: java.io.IOException: Failed to deserialize Avro record.
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)  
        at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
        at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) 
        at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
      Caused by: org.apache.avro.AvroTypeException: Found my.type.avro.MyEnumType, expecting union
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
        at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
        at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
        at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
        ... 9 more
      

      Look in the attachments for a reproducer.

      Same data serialized to a file works fine (look the filesystem example in the reproducer)

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            slinkydeveloper Francesco Guardiani

            Dates

              Created:
              Updated:

              Slack

                Issue deployment