Details

      Description

      Flink 1.2-SNAPSHOT (Commit: 5b54009) on Windows.

      Triggering a savepoint for a streaming job, both the savepoint and the job failed.

      The job failed with the following exception:

      java.lang.RuntimeException: Error while triggering checkpoint for IterationSource-7 (1/1)
      	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1026)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
      	at java.util.concurrent.FutureTask.run(Unknown Source)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
      	at java.lang.Thread.run(Unknown Source)
      Caused by: java.lang.NullPointerException
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorIdentifier(StreamTask.java:767)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.access$500(StreamTask.java:115)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.createStreamFactory(StreamTask.java:986)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:956)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:583)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:551)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:511)
      	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1019)
      	... 5 more
      
      And the savepoint failed with the following exception:
      
      Using address /127.0.0.1:6123 to connect to JobManager.
      Triggering savepoint for job 153310c4a836a92ce69151757c6b73f1.
      Waiting for response...
      
      ------------------------------------------------------------
       The program finished with the following exception:
      
      java.lang.Exception: Failed to complete savepoint
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:793)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:782)
              at org.apache.flink.runtime.concurrent.impl.FlinkFuture$6.recover(FlinkFuture.java:263)
              at akka.dispatch.Recover.internal(Future.scala:267)
              at akka.dispatch.japi$RecoverBridge.apply(Future.scala:183)
              at akka.dispatch.japi$RecoverBridge.apply(Future.scala:181)
              at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
              at scala.util.Try$.apply(Try.scala:161)
              at scala.util.Failure.recover(Try.scala:185)
              at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
              at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
              at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
              at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
              at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
              at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
              at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
              at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
              at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
              at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
              at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
              at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
              at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
              at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
              at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.lang.Exception: Checkpoint failed: Checkpoint Coordinator is shutting down
              at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortError(PendingCheckpoint.java:338)
              at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.shutdown(CheckpointCoordinator.java:245)
              at org.apache.flink.runtime.executiongraph.ExecutionGraph.postRunCleanup(ExecutionGraph.java:1065)
              at org.apache.flink.runtime.executiongraph.ExecutionGraph.jobVertexInFinalState(ExecutionGraph.java:1034)
              at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.subtaskInFinalState(ExecutionJobVertex.java:435)
              at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.vertexCancelled(ExecutionJobVertex.java:407)
              at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionCanceled(ExecutionVertex.java:593)
              at org.apache.flink.runtime.executiongraph.Execution.cancelingComplete(Execution.java:729)
              at org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1105)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:687)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:686)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:686)
              at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
              at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
              at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
              at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
              at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
              at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
              at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
              ... 2 more
      Caused by: java.lang.Exception: Checkpoint Coordinator is shutting down
              ... 20 more
      

      Maybe worth mentionning : the iteration body contains MapFunction and its thread was in a sleep state (put manually) during the savepoint.

      1. SavepointBug.java
        4 kB
        Yassine Marzougui

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user senorcarbone commented on the issue:

          https://github.com/apache/flink/pull/3088

          Hey! In this upcoming [PR ](https://github.com/apache/flink/pull/1668) there is a custom operator responsible for the logging at the iteration head. Hope this does not break your tests in case they assume a null op.
          Also, in case any of you is familiar with asynchronous checkpoints with ListState, could you check my last question in the same PR? thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user senorcarbone commented on the issue: https://github.com/apache/flink/pull/3088 Hey! In this upcoming [PR ] ( https://github.com/apache/flink/pull/1668 ) there is a custom operator responsible for the logging at the iteration head. Hope this does not break your tests in case they assume a null op. Also, in case any of you is familiar with asynchronous checkpoints with ListState, could you check my last question in the same PR? thanks
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Fixed on master in:
          9c6eb5793258de15a83f4cf7b13180d370062531
          82ed799999e3f05ebfd67d69dfb56ff13dbd497a

          Fixed on release-1.2 in:
          81eaafac70a9ec543ae2e81b6dd006d80c137fa5
          9e2b16e5d4d775ea0b806aa500f4e49e10f2a917

          Show
          aljoscha Aljoscha Krettek added a comment - Fixed on master in: 9c6eb5793258de15a83f4cf7b13180d370062531 82ed799999e3f05ebfd67d69dfb56ff13dbd497a Fixed on release-1.2 in: 81eaafac70a9ec543ae2e81b6dd006d80c137fa5 9e2b16e5d4d775ea0b806aa500f4e49e10f2a917
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter closed the pull request at:

          https://github.com/apache/flink/pull/3088

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3088
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3088

          Thanks for reviewing and merging @aljoscha !

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3088 Thanks for reviewing and merging @aljoscha !
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3088

          Merged, could you please close this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3088 Merged, could you please close this PR?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3088

          Changes look very good! I fixed the formatting of the newly added methods in `TestingCluster` to conform to Scala coding guidelines.

          I rebased on master, will wait for Travis to give the green light and then merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3088 Changes look very good! I fixed the formatting of the newly added methods in `TestingCluster` to conform to Scala coding guidelines. I rebased on master, will wait for Travis to give the green light and then merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3088

          cc @aljoscha

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3088 cc @aljoscha
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/3088

          FLINK-5407 Fix savepoints for iterative jobs

          This PR fixes savepoints for iterative jobs. Savepoints failed with NPE because the code assumed that operators in an operator chain are never null. For iterative jobs, this can happen.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink NPE-Iterative-Snapshot

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3088.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3088


          commit 984d596c063b5082520d8d58baa6b7361b1e9921
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-01-05T13:28:50Z

          FLINK-5407 Handle snapshoting null-operator in chain

          commit c96fe7ba35764b4f9e05ed61199b2027981daa54
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-01-10T15:08:06Z

          FLINK-5407 IT case for savepoint with iterative job


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3088 FLINK-5407 Fix savepoints for iterative jobs This PR fixes savepoints for iterative jobs. Savepoints failed with NPE because the code assumed that operators in an operator chain are never null. For iterative jobs, this can happen. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink NPE-Iterative-Snapshot Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3088.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3088 commit 984d596c063b5082520d8d58baa6b7361b1e9921 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-01-05T13:28:50Z FLINK-5407 Handle snapshoting null-operator in chain commit c96fe7ba35764b4f9e05ed61199b2027981daa54 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-01-10T15:08:06Z FLINK-5407 IT case for savepoint with iterative job
          Hide
          ymarzougui Yassine Marzougui added a comment -

          I attached a simplified version of the job reproducing the bug and confirmed it against the lastest master (Commit: 6ac5794). I noticed that if I remove the lines:

          .keyBy(0)
          .flatMap(new DuplicateFilter()).setParallelism(1)
          

          the job doesn't fail, but the savepoint is stuck in the sate :
          Triggering savepoint for job 4f4d0b4308aabc21a243ec34e4c193ba.
          Waiting for response...

          Show
          ymarzougui Yassine Marzougui added a comment - I attached a simplified version of the job reproducing the bug and confirmed it against the lastest master (Commit: 6ac5794). I noticed that if I remove the lines: .keyBy(0) .flatMap( new DuplicateFilter()).setParallelism(1) the job doesn't fail, but the savepoint is stuck in the sate : Triggering savepoint for job 4f4d0b4308aabc21a243ec34e4c193ba. Waiting for response...

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development