Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-2807

NullPointerException during checkpoint on FlinkRunner

Details

    • Bug
    • Status: Resolved
    • P0
    • Resolution: Fixed
    • 2.1.0
    • 2.2.0
    • runner-flink
    • None

    Description

      Beam version: 2.1.0
      Runner: FlinkRunner

      We're seeing the following exception when checkpointing, which is causing our job to restart

      2017-08-25 09:42:17,658 INFO  org.apache.flink.runtime.taskmanager.Task                     - Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem (7/32) (f00a31b722a31030f18d83ac613de21d) switched from RUNNING to FAILED.
      AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem (7/32).}
          at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:966)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator Combine.perKey(Count) -> ParMultiDo(ToConcurrentStreamResult) -> ParMultiDo(Anonymous) -> ParMultiDo(ApplyShardingKey) -> ToKeyedWorkItem (7/32).
          ... 6 more
      Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
          at java.util.concurrent.FutureTask.report(FutureTask.java:122)
          at java.util.concurrent.FutureTask.get(FutureTask.java:192)
          at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
          at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
          ... 5 more
          Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
              at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
              at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018)
              at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957)
              ... 5 more
          Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
              at java.util.concurrent.FutureTask.report(FutureTask.java:122)
              at java.util.concurrent.FutureTask.get(FutureTask.java:192)
              at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
              at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
              at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
              ... 7 more
          Caused by: java.lang.NullPointerException
              at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
              at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
              at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189)
              at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413)
              at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:229)
              at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:151)
              at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoWriterV3.writeStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:107)
              at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.write(KeyedBackendSerializationProxy.java:104)
              at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:293)
              at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:286)
              at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:329)
              at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
              at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
              at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
              at org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:263)
              at org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:178)
              at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:97)
              at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
              at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
              ... 1 more
      

      From debugging locally I've narrowed it down to here

          Caused by: java.lang.NullPointerException
              at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
              at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
              at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer$CoderTypeSerializerConfigSnapshot.write(CoderTypeSerializer.java:189)
              at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.write(TypeSerializerSerializationUtil.java:413)
      

      Specifically in CoderTypeSerializer.java#189, when it calls DataOutputStream.writeUTF(String), there is some logic in the writeUTF method that gets the string length. This is what is causing the NPE as the coderName field is null.

      I think this stems from the constructor which sets the coderName by calling .getClass().getCanonicalName(); on the coder that is passed into the constructor

      On debugging I've noticed this returns null when calling .getClass().getCanonicalName(); on an instance of Count$CountFn

      Attachments

        Issue Links

          Activity

            People

              djharper Daniel Harper
              djharper Daniel Harper
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: