Details
-
Bug
-
Status: Resolved
-
P0
-
Resolution: Fixed
-
2.1.0
-
None
Description
Beam version: 2.1.0
Runner: FlinkRunner
We're seeing the following exception when checkpointing, which is causing our job to restart
2017-08-25 09:42:17,658 INFO org.apache.flink.runtime.taskmanager.Task - Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem (7/32) (f00a31b722a31030f18d83ac613de21d) switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem (7/32).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:966) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem (7/32). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957) ... 5 more Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) ... 7 more Caused by: java.lang.NullPointerException at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347) at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:229) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:151) at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoWriterV3.writeStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:107) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:104) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:293) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:286) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:329) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542) at org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:263) at org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:178) at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:97) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) ... 1 more
From debugging locally I've narrowed it down to here
Caused by: java.lang.NullPointerException at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347) at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413)
Specifically in CoderTypeSerializer.java#189, when it calls DataOutputStream.writeUTF(String), there is some logic in the writeUTF method that gets the string length. This is what is causing the NPE as the coderName field is null.
I think this stems from the constructor which sets the coderName by calling .getClass().getCanonicalName(); on the coder that is passed into the constructor
On debugging I've noticed this returns null when calling .getClass().getCanonicalName(); on an instance of Count$CountFn
Attachments
Issue Links
- is related to
-
BEAM-2407 Flink CoderTypeSerializer ConfigSnapshot cannot be deserialised
- Resolved
- links to