Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-36139

ClassCastException when checkpointing AggregateFunction that implements ResultTypeQueryable interface

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.18.1
    • None
    • API / DataStream
    • None
    • Kubernetes Native Session Cluster

    Description

      Let's consider simple example of AggregateFunction with custom Accumulator:

       

          static class BatchingFunction implements AggregateFunction<Row, BatchingFunction.Accumulator, Row>, ResultTypeQueryable<Row> {
      
              @Getter        
              private final RowTypeInfo producedType;
              
              public BatchingFunction() {
                  this.producedType = ...
              }
              
              @Override
              public Accumulator createAccumulator() {
                  return new Accumulator();
              }
      
              @Override
              public Accumulator add(Row row, Accumulator acc) {
                  acc.add(row);
                  return acc;
              }
         
              @Override
              public Accumulator merge(Accumulator acc1, Accumulator acc2) {
                  acc1.merge(acc2);
                  return acc1;
              }
      
              @Override
              public Row getResult(Accumulator accumulator) {
                  ...
              }
      
              private static class Accumulator implements Serializable {
                  
                  private final List<Row> rows = new ArrayList<>();
      
                  List<Row> getAll() {
                      return rows;
                  }
      
                  Accumulator merge(Accumulator acc2) {
                      this.rows.addAll(acc2.rows);
                      acc2.clear();
                      return this;
                  }
      
                  void add(Row row) {
                      rows.add(row);
                  }
      
                  void clear() {
                      rows.clear();
                  }
              }
          }
       

      When resubmitting a job on a Flink Kubernetes Session cluster with aligned checkpoints that include this BatchingFunction, a ClassCastException is thrown in the JobManager logs:

       

      org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed.    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist-1.18.1.jar:1.18.1]    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]    at java.lang.Thread.run(Unknown Source) [?:?]Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not materialize checkpoint 1 for operator transform -> (Sink: metrics_transform, sink: Writer -> sink: Committer) (1/1)#0.    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) ~[flink-dist-1.18.1.jar:1.18.1]    ... 4 moreCaused by: org.apache.flink.util.SerializedThrowable: java.util.concurrent.ExecutionException: java.lang.ClassCastException: class ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator cannot be cast to class org.apache.flink.types.Row (ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app')    at java.util.concurrent.FutureTask.report(Unknown Source) ~[?:?]    at java.util.concurrent.FutureTask.get(Unknown Source) ~[?:?]    at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.18.1.jar:1.18.1]    ... 3 moreCaused by: org.apache.flink.util.SerializedThrowable: java.lang.ClassCastException: class ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator cannot be cast to class org.apache.flink.types.Row (ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app')    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:69) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:147) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) ~[flink-dist-1.18.1.jar:1.18.1]    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]    at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.18.1.jar:1.18.1]    at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.18.1.jar:1.18.1]    ... 3 more 

      In this case Flink uses incorrect serializer to write Accumulator objects to state. I would also like to note that this behavior is stochastic, since when I launch job first time on a new cluster such errors are not observed during checkpoints.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            vladislav.keda Vladislav Keda
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: