Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7073

AvroUtils converting generic record to Beam Row causes class cast exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • P2
    • Resolution: Fixed
    • 2.11.0
    • 2.14.0, 2.17.0
    • sdk-java-core
    • Direct Runner

    Description

      Below is my pipeline:

      KafkaSource (KafkaIo.read) ----------> Pardo -------> BeamSql-----> KafkaSink (KafkaIO.write)

      Kafka Source IO reads from Kafka topic avro records and deserializes it to generic record using below

      KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String, GenericRecord>read()
                      .withBootstrapServers(bootstrapServerUrl)

                      .withTopic(topicName)
                      .withKeyDeserializer(StringDeserializer.class)
                      .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
                                                                           AvroCoder.of(GenericRecord.class, avroSchema))   

                      .updateConsumerProperties(ImmutableMap.of("schema.registry.url",

                                                                                                                       schemaRegistryUrl));

      Avro schema of the topic has a logicaltype (timestamp-millis). This is deserialized to

      joda-time.

                {
                  "name": "timeOfRelease",
                  "type": [
                      "null",
                      

      {                     "type": "long",                     "logicalType": "timestamp-millis",                     "connect.version": 1,                     "connect.name": "org.apache.kafka.connect.data.Timestamp"                 }

                  ],
                  "default": null,
             }

      Now in my Pardo transform, I am trying to use the AvroUtils class methods to convert the generic record to Beam Row and getting below class cast exception

                   AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)

      Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot be cast to java.lang.Long
          at org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
          at org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)

       

      This looks like a bug as joda time type created as part of deserialization is being type casted to Long in below code.

            else if (logicalType instanceof LogicalTypes.TimestampMillis)

      {               return convertDateTimeStrict((Long) value, fieldType);       }

      PS: I also used the avro-tools 1.8.2 jar to get the classes for the mentioned avro schema and I see that the attribute with timestamp-millis logical type is being converted to joda-time.

       

      Attachments

        Issue Links

          Activity

            People

              rskraba Ryan Skraba
              vbm Vishwas
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 50m
                  50m