Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17486

ClassCastException when checkpointing AVRO SpecificRecord with decimal fields

    XMLWordPrintableJSON

Details

    Description

      When consuming from a Kafka source AVRO SpecificRecord containing a decimal (logical type) field, copying the record fails with:

      java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.nio.ByteBuffer

      I understand the problem arises when Flink tries to make a deep-copy of the record for checkpointing.

      This code reproduces the problem (https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java):

       

      AvroSerializer<Sample> serializer = new AvroSerializer<>(Sample.class);
      Sample s1 = Sample.newBuilder()
         .setPrice(BigDecimal.valueOf(42.32))
         .setId("A12345")
         .build();
      Sample s2 = serializer.copy(s1);
      

       

       

      The AVRO SpecificRecord is generated from this IDL (using the maven-avro-plugin):

      @namespace("example.avro")
       protocol SampleProtocol {
         record Sample{
           string id;
           decimal(9,2) price;
           timestamp_ms eventTime;
          }
       }

      In particular, I had the problem after attaching an AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord and using Confluent Schema Registry. The assigned extracts the event time from the record and enabling bookmarking (not sure whether this is related).
      A simplified version of the application is here: https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java

       

      The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 1.8.2.

      In fact, the following code does deep-copy only relying on AVRO and does work:  

      Sample s1 = Sample.newBuilder()
         .setPrice(BigDecimal.valueOf(42.32))
         .setId("A12345")
         .build();
       Sample s2 = Sample.newBuilder(s1).build();

       

      Code of the two tests and simplified application: https://github.com/nicusX/flink-avro-bug

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            nicusX Lorenzo Nicora
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: