Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-10759

KafkaIO with Avro deserializer fails with evolved schema

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.23.0
    • 2.26.0
    • io-java-kafka
    • None

    Description

      When using KafkaIO with ConfluentSchemaRegistryDeserializerProvider, exception could be thrown when consuming a topic with evolved schema.

      It is because when the DeserializerProvider is initialized, it create a AvroCoder instance using either the latest Avro schema by default, or a specific version of provided.

      If the Kafka topic contains records with multiple schema versions, AvroCoder will fail to encode records with different schemas. The specific exception differs depending on the schema change. For example, I have encountered type cast error and null pointer error.

      To fix this issue, we can make use of the writer-reader schema arguments from Avro to deserialize Kafka records to the same schema with the AvroCoder. The method is available in io.confluent.kafka.serializers.KafkaAvroDeserializer

          public Object deserialize(String s, byte[] bytes, Schema readerSchema) {
              return this.deserialize(bytes, readerSchema);
          }
      

      Attachments

        Issue Links

          Activity

            People

              dennisyung Dennis Yung
              dennisyung Dennis Yung
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h
                  2h