Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.18.1
-
None
-
None
-
Kubernetes Native Session Cluster
Description
Let's consider simple example of AggregateFunction with custom Accumulator:
static class BatchingFunction implements AggregateFunction<Row, BatchingFunction.Accumulator, Row>, ResultTypeQueryable<Row> { @Getter private final RowTypeInfo producedType; public BatchingFunction() { this.producedType = ... } @Override public Accumulator createAccumulator() { return new Accumulator(); } @Override public Accumulator add(Row row, Accumulator acc) { acc.add(row); return acc; } @Override public Accumulator merge(Accumulator acc1, Accumulator acc2) { acc1.merge(acc2); return acc1; } @Override public Row getResult(Accumulator accumulator) { ... } private static class Accumulator implements Serializable { private final List<Row> rows = new ArrayList<>(); List<Row> getAll() { return rows; } Accumulator merge(Accumulator acc2) { this.rows.addAll(acc2.rows); acc2.clear(); return this; } void add(Row row) { rows.add(row); } void clear() { rows.clear(); } } }
When resubmitting a job on a Flink Kubernetes Session cluster with aligned checkpoints that include this BatchingFunction, a ClassCastException is thrown in the JobManager logs:
org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Asynchronous task checkpoint failed. at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:320) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:155) ~[flink-dist-1.18.1.jar:1.18.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?]Caused by: org.apache.flink.util.SerializedThrowable: java.lang.Exception: Could not materialize checkpoint 1 for operator transform -> (Sink: metrics_transform, sink: Writer -> sink: Committer) (1/1)#0. at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.handleExecutionException(AsyncCheckpointRunnable.java:298) ~[flink-dist-1.18.1.jar:1.18.1] ... 4 moreCaused by: org.apache.flink.util.SerializedThrowable: java.util.concurrent.ExecutionException: java.lang.ClassCastException: class ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator cannot be cast to class org.apache.flink.types.Row (ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app') at java.util.concurrent.FutureTask.report(Unknown Source) ~[?:?] at java.util.concurrent.FutureTask.get(Unknown Source) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:511) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.18.1.jar:1.18.1] ... 3 moreCaused by: org.apache.flink.util.SerializedThrowable: java.lang.ClassCastException: class ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator cannot be cast to class org.apache.flink.types.Row (ru.glowbyte.streaming.core.operators.transformations.Batch$BatchingFunction$Accumulator is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @3e54db3e; org.apache.flink.types.Row is in unnamed module of loader 'app') at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:69) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.state.heap.CopyOnWriteStateMapSnapshot.writeState(CopyOnWriteStateMapSnapshot.java:147) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.state.heap.AbstractStateTableSnapshot.writeStateInKeyGroup(AbstractStateTableSnapshot.java:116) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeStateInKeyGroup(CopyOnWriteStateTableSnapshot.java:38) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.state.heap.HeapSnapshotStrategy.lambda$asyncSnapshot$3(HeapSnapshotStrategy.java:172) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:91) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.state.SnapshotStrategyRunner$1.callInternal(SnapshotStrategyRunner.java:88) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:78) ~[flink-dist-1.18.1.jar:1.18.1] at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:508) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:54) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) ~[flink-dist-1.18.1.jar:1.18.1] ... 3 more
In this case Flink uses incorrect serializer to write Accumulator objects to state. I would also like to note that this behavior is stochastic, since when I launch job first time on a new cluster such errors are not observed during checkpoints.