Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5663

Checkpoint fails because of closed registry

    Details

      Description

      While testing the 1.2.0 release I got the following Exception:

      2017-01-26 17:29:20,602 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (3/8) (2dbce778c4e53a39dec3558e868ceef4) switched from RUNNING to FAILED.
      java.lang.Exception: Error while triggering checkpoint 2 for Source: Custom Source (3/8)
      	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1117)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not perform checkpoint 2 for operator Source: Custom Source (3/8).
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:533)
      	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1108)
      	... 5 more
      Caused by: java.lang.Exception: Could not complete snapshot 2 for operator Source: Custom Source (3/8).
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:372)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1116)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1052)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:640)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:585)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528)
      	... 6 more
      Caused by: java.io.IOException: Could not flush and close the file system output stream to file:/Users/uce/Desktop/1-2-testing/fs/83889867a493a1dc80f6c588c071b679/chk-2/e4415d0d-719c-48df-91a9-3171ba468152 in order to obtain the stream state handle
      	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:333)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:200)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
      	... 11 more
      Caused by: java.io.IOException: Could not open output stream for state backend
      	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:368)
      	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
      	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:305)
      	... 13 more
      Caused by: java.io.IOException: Cannot register Closeable, registry is already closed. Closing argument.
      	at org.apache.flink.util.AbstractCloseableRegistry.registerClosable(AbstractCloseableRegistry.java:63)
      	at org.apache.flink.core.fs.ClosingFSDataOutputStream.wrapSafe(ClosingFSDataOutputStream.java:99)
      	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:123)
      	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:359)
      	... 15 more
      

      The job recovered and kept running.

      Stefan Richter Can this be a race with the closable registry?

        Issue Links

          Activity

          Hide
          uce Ufuk Celebi added a comment -

          Out of 10 submission, 3 failed with this.

          Show
          uce Ufuk Celebi added a comment - Out of 10 submission, 3 failed with this.
          Hide
          srichter Stefan Richter added a comment -

          I don't think this is a race condition. This exception only happens when the registry was already closed which in turn should only happen after the enclosing task ended.

          Show
          srichter Stefan Richter added a comment - I don't think this is a race condition. This exception only happens when the registry was already closed which in turn should only happen after the enclosing task ended.
          Hide
          srichter Stefan Richter added a comment -

          Then again, question is if this task already ended and for some reason a checkpoint was still somewhere in the process of opening a stream. The exception itself is not problematic as long as the task was already finished for some reason before.

          Show
          srichter Stefan Richter added a comment - Then again, question is if this task already ended and for some reason a checkpoint was still somewhere in the process of opening a stream. The exception itself is not problematic as long as the task was already finished for some reason before.
          Hide
          StephanEwen Stephan Ewen added a comment -

          Looks like the task was not in any final state, so something made the registry close prematurely.

          Might be the same issue that also causes the sporadic test failures?

          Show
          StephanEwen Stephan Ewen added a comment - Looks like the task was not in any final state, so something made the registry close prematurely. Might be the same issue that also causes the sporadic test failures?
          Hide
          srichter Stefan Richter added a comment -

          The only place this registry ever gets closed is through one method, only called at the end of a task. So the first thing that is a little less transparent from my point of view is the ThreadLocal behaviour. Do you have logs for this run? There could be some useful info outputs.

          Show
          srichter Stefan Richter added a comment - The only place this registry ever gets closed is through one method, only called at the end of a task. So the first thing that is a little less transparent from my point of view is the ThreadLocal behaviour. Do you have logs for this run? There could be some useful info outputs.
          Hide
          StephanEwen Stephan Ewen added a comment -

          A few notes what I saw so far

          • the actual checkpointing work that fails here is not done by the main thread of the task, but by the task's asyncCallDispatcher
          • The asyncCallDispatcher is a single-thread-executor created on the first triggerCheckpoint() message. Thus it should be a child thread of some actor system pool thread.
          • As a result, the asyncCallDispatcher should not have access to the thread local
          • However, the FileSystem that creates the streams is a{{SafetyNetWrapperFileSystem}} because it was created by the main thread during initialization (when the KeyedStateBackend is created)

          I am wondering if that interplay is the cause of issues.

          I cannot quite understand it, though. For the asyncCallDispatcher it should not make a difference which file system it uses. It has no access to the closeable registry and should not be able to close streams.

          Show
          StephanEwen Stephan Ewen added a comment - A few notes what I saw so far the actual checkpointing work that fails here is not done by the main thread of the task, but by the task's asyncCallDispatcher The asyncCallDispatcher is a single-thread-executor created on the first triggerCheckpoint() message. Thus it should be a child thread of some actor system pool thread. As a result, the asyncCallDispatcher should not have access to the thread local However, the FileSystem that creates the streams is a{{SafetyNetWrapperFileSystem}} because it was created by the main thread during initialization (when the KeyedStateBackend is created) I am wondering if that interplay is the cause of issues. I cannot quite understand it, though. For the asyncCallDispatcher it should not make a difference which file system it uses. It has no access to the closeable registry and should not be able to close streams.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/3228

          FLINK-5663 Prevent leaking SafetyNetCloseableRegistry though InheritableThreadLocal

          This PR prevents the `SafetyNetCloseableRegistry` from leaking into pooled threads through `InheritableThreadLocal`.

          As first step, we use `ThreadLocal` instead of `InheritableThreadLocal` to hold the closeable registries.

          Additionally, we also create safety nets for the file system at the scope of the checkpointing thread. We hope that this covers already covers most cases. Other threads could actually also create safety nets for their scope right now.

          As a last change, we made the reaper thread a singleton, because we could potentially create more registries now and it is not required to have one reaper thread per registry.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink safetyNet2

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3228.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3228


          commit 21e2a31ece3c56a9d79cb127f9829f770ebe56cf
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-01-27T15:32:35Z

          FLINK-5663 Prevent leaking safetynet closeable registry


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3228 FLINK-5663 Prevent leaking SafetyNetCloseableRegistry though InheritableThreadLocal This PR prevents the `SafetyNetCloseableRegistry` from leaking into pooled threads through `InheritableThreadLocal`. As first step, we use `ThreadLocal` instead of `InheritableThreadLocal` to hold the closeable registries. Additionally, we also create safety nets for the file system at the scope of the checkpointing thread. We hope that this covers already covers most cases. Other threads could actually also create safety nets for their scope right now. As a last change, we made the reaper thread a singleton, because we could potentially create more registries now and it is not required to have one reaper thread per registry. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink safetyNet2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3228.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3228 commit 21e2a31ece3c56a9d79cb127f9829f770ebe56cf Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-01-27T15:32:35Z FLINK-5663 Prevent leaking safetynet closeable registry
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3228

          cc @StephanEwen

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3228 cc @StephanEwen
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3228

          Making the registry not inherited is a good quick fix.

          How about moving the initialization and closing of that registry into the task's trigger checkpoint action? https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L1119

          That way it is tied to the dedicated asynchronous thread, which is safer, I think. Having the registry initialization/closing in the `triggerCheckpoint(...)` method means the method can never be called by the main thread. It probably does not happen currently, but it seems quite easy to accidentally violate.

          There is also some additional refactoring in this pull request that makes the reaper thread a static variable, shared across all registries. Currently that thread seems to never stopped by anyone. How about factoring out the changes for the single static reaper thread into a separate issue (that we do not merge for 1.2) and addressing the thread stopping as well (via an atomic count how many registries are currently open, stopping it when ging to zero, starting when going from zero).

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3228 Making the registry not inherited is a good quick fix. How about moving the initialization and closing of that registry into the task's trigger checkpoint action? https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L1119 That way it is tied to the dedicated asynchronous thread, which is safer, I think. Having the registry initialization/closing in the `triggerCheckpoint(...)` method means the method can never be called by the main thread. It probably does not happen currently, but it seems quite easy to accidentally violate. There is also some additional refactoring in this pull request that makes the reaper thread a static variable, shared across all registries. Currently that thread seems to never stopped by anyone. How about factoring out the changes for the single static reaper thread into a separate issue (that we do not merge for 1.2) and addressing the thread stopping as well (via an atomic count how many registries are currently open, stopping it when ging to zero, starting when going from zero).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3228

          To clarify: The "static reaper thread" change is a good idea, but the change seems a bit incomplete without well defined shutdown of the thread.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3228 To clarify: The "static reaper thread" change is a good idea, but the change seems a bit incomplete without well defined shutdown of the thread.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3228

          I agree about the first part.

          About the reaper thread, I was also concerned about stopping and the counter is not a fix: there can be 0 registries but at some point a new task could come up, right?

          The reaper is now a daemon thread, so it will go down with the JVM, before which all streams get closed anyways. Or am I missing something?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3228 I agree about the first part. About the reaper thread, I was also concerned about stopping and the counter is not a fix: there can be 0 registries but at some point a new task could come up, right? The reaper is now a daemon thread, so it will go down with the JVM, before which all streams get closed anyways. Or am I missing something?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3228

          Ok, maybe what we are talking about is respawning the reaper thread if we go again from 0 to 1. That is possible.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3228 Ok, maybe what we are talking about is respawning the reaper thread if we go again from 0 to 1. That is possible.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3228

          Concerning the reaper thread:

          • It is not really broken in the release branch, only has a few more threads then necessary.
          • So far we have strived to make sure Flink does not leave any lingering Threads at all (as validated by the MiniCluster thread), because it actually messes up testing setups from many users that repeatedly execute programs with the LocalEnvironment. That would be good to keep.
          • One can probably stop the thread when all registries are closed and re-spawn it when new registries come. That would be

          In summary: I think lingering threads are a type of regression, actually. Introducing that for something that is not broken right now is something I would not do for a release. Especially given that there is probably a cleaner solution that can both implement the improvement and not have the lingering threads regression. Let's do that for the 1.2.1/1.3 release.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3228 Concerning the reaper thread: It is not really broken in the release branch, only has a few more threads then necessary. So far we have strived to make sure Flink does not leave any lingering Threads at all (as validated by the MiniCluster thread), because it actually messes up testing setups from many users that repeatedly execute programs with the LocalEnvironment. That would be good to keep. One can probably stop the thread when all registries are closed and re-spawn it when new registries come. That would be In summary: I think lingering threads are a type of regression, actually. Introducing that for something that is not broken right now is something I would not do for a release. Especially given that there is probably a cleaner solution that can both implement the improvement and not have the lingering threads regression. Let's do that for the 1.2.1/1.3 release.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/3229

          FLINK-5663 Prevent leaking SafetyNetCloseableRegistry through Inher…

          This PR prevents the SafetyNetCloseableRegistry from leaking into pooled threads through InheritableThreadLocal.

          As first step, we use ThreadLocal instead of InheritableThreadLocal to hold the closeable registries.

          Additionally, we also create safety nets for the file system at the scope of the checkpointing thread. We hope that this covers already covers most cases. Other threads could actually also create safety nets for their scope right now.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink safetyNetThreadLocal

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3229.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3229


          commit 31ae956fdd83a2f65bf22c3ad601d4d65ad61439
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-01-27T18:47:12Z

          FLINK-5663 Prevent leaking SafetyNetCloseableRegistry through InheritableThreadLocal


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3229 FLINK-5663 Prevent leaking SafetyNetCloseableRegistry through Inher… This PR prevents the SafetyNetCloseableRegistry from leaking into pooled threads through InheritableThreadLocal. As first step, we use ThreadLocal instead of InheritableThreadLocal to hold the closeable registries. Additionally, we also create safety nets for the file system at the scope of the checkpointing thread. We hope that this covers already covers most cases. Other threads could actually also create safety nets for their scope right now. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink safetyNetThreadLocal Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3229.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3229 commit 31ae956fdd83a2f65bf22c3ad601d4d65ad61439 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-01-27T18:47:12Z FLINK-5663 Prevent leaking SafetyNetCloseableRegistry through InheritableThreadLocal
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3228

          Closed in favour of #3229 and #3230 .

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3228 Closed in favour of #3229 and #3230 .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter closed the pull request at:

          https://github.com/apache/flink/pull/3228

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3228
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3229

          Looks good to me, merging this for `1.2` and `master`...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3229 Looks good to me, merging this for `1.2` and `master`...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3229

          Thank you for reviewing the PR!
          I'll create RC3 afterwards.

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3229 Thank you for reviewing the PR! I'll create RC3 afterwards.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3229

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3229
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in

          • 1.2.0 via 617ff50c6103aa5d5354b6339531dc38d62127ec
          • 1.3.0 via 50b665677831529ee492ceda40a3c8fd750d62ff
          Show
          StephanEwen Stephan Ewen added a comment - Fixed in 1.2.0 via 617ff50c6103aa5d5354b6339531dc38d62127ec 1.3.0 via 50b665677831529ee492ceda40a3c8fd750d62ff

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              uce Ufuk Celebi
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development