Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.10.0
-
None
-
Flink 1.10.0
AVRO 1.9.2
Java 1.8.0 (but also Java 14)
Scala binary 2.11
Description
When consuming from a Kafka source AVRO SpecificRecord containing a decimal (logical type) field, copying the record fails with:
java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.nio.ByteBuffer
I understand the problem arises when Flink tries to make a deep-copy of the record for checkpointing.
This code reproduces the problem (https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java):
AvroSerializer<Sample> serializer = new AvroSerializer<>(Sample.class); Sample s1 = Sample.newBuilder() .setPrice(BigDecimal.valueOf(42.32)) .setId("A12345") .build(); Sample s2 = serializer.copy(s1);
The AVRO SpecificRecord is generated from this IDL (using the maven-avro-plugin):
@namespace("example.avro")
protocol SampleProtocol {
record Sample{
string id;
decimal(9,2) price;
timestamp_ms eventTime;
}
}
In particular, I had the problem after attaching an AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord and using Confluent Schema Registry. The assigned extracts the event time from the record and enabling bookmarking (not sure whether this is related).
A simplified version of the application is here: https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java
The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 1.8.2.
In fact, the following code does deep-copy only relying on AVRO and does work:
Sample s1 = Sample.newBuilder()
.setPrice(BigDecimal.valueOf(42.32))
.setId("A12345")
.build();
Sample s2 = Sample.newBuilder(s1).build();
Code of the two tests and simplified application: https://github.com/nicusX/flink-avro-bug