Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.10.0
-
None
-
None
Description
We hit the following issue when trying to use avro logical timestamp types:
CREATE TABLE source_table ( int_field INT, timestamp_field TIMESTAMP(3) ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'avro_tset', 'connector.properties.bootstrap.servers' = '<...>', 'format.type' = 'avro', 'format.avro-schema' = '{ "type": "record", "name": "test", "fields" : [ {"name": "int_field", "type": "int"}, {"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}} ] }' ) INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
And the error:
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.lang.Long at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166) at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) at org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)
Dawid's analysis from the ML discussion:
It seems that the information about the bridging class (java.sql.Timestamp in this case) is lost in the stack. Because this information is lost/not respected the planner produces LocalDateTime instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails for LocalDateTime.