Details
-
Bug
-
Status: Resolved
-
P2
-
Resolution: Fixed
-
2.11.0
-
Direct Runner
Description
Below is my pipeline:
KafkaSource (KafkaIo.read) ----------> Pardo -------> BeamSql-----> KafkaSink (KafkaIO.write)
Kafka Source IO reads from Kafka topic avro records and deserializes it to generic record using below
KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String, GenericRecord>read()
.withBootstrapServers(bootstrapServerUrl)
.withTopic(topicName)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder(GenericAvroDeserializer.class,
AvroCoder.of(GenericRecord.class, avroSchema))
.updateConsumerProperties(ImmutableMap.of("schema.registry.url",
schemaRegistryUrl));
Avro schema of the topic has a logicaltype (timestamp-millis). This is deserialized to
joda-time.
{
"name": "timeOfRelease",
"type": [
"null",
],
"default": null,
}
Now in my Pardo transform, I am trying to use the AvroUtils class methods to convert the generic record to Beam Row and getting below class cast exception
AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)
Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot be cast to java.lang.Long
at org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
at org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)
This looks like a bug as joda time type created as part of deserialization is being type casted to Long in below code.
else if (logicalType instanceof LogicalTypes.TimestampMillis)
{ return convertDateTimeStrict((Long) value, fieldType); }PS: I also used the avro-tools 1.8.2 jar to get the classes for the mentioned avro schema and I see that the attribute with timestamp-millis logical type is being converted to joda-time.
Attachments
Issue Links
- links to