Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5214

Clean up checkpoint files when failing checkpoint operation on TM

    Details

      Description

      When the StreamTask#performCheckpoint operation fails on a TaskManager potentially created checkpoint files are not cleaned up. This should be changed.

        Issue Links

          Activity

          Hide
          xiaogang.shi Xiaogang Shi added a comment -

          I opened FLINK-5086 to report a similar problem, but I do not have a good idea how to resolve it.

          Because JM does know the existence of these checkpoint files, it seems only TM can delete them. But as a failed TM may not be recovered by the JM if the number of retries exceeds the given limit, these files will not be deleted in such cases.

          One possible solution i think is to let each TM return a handler to JM when the TM is registered. JM can use the handler to clean the files even when the TM fails.

          Another solution is to recover the TM when the number of retries exceeds the limit. Once the TM is recovered, the only thing it does is to clean the checkpoint files.

          Do you have any better ideas?

          Show
          xiaogang.shi Xiaogang Shi added a comment - I opened FLINK-5086 to report a similar problem, but I do not have a good idea how to resolve it. Because JM does know the existence of these checkpoint files, it seems only TM can delete them. But as a failed TM may not be recovered by the JM if the number of retries exceeds the given limit, these files will not be deleted in such cases. One possible solution i think is to let each TM return a handler to JM when the TM is registered. JM can use the handler to clean the files even when the TM fails. Another solution is to recover the TM when the number of retries exceeds the limit. Once the TM is recovered, the only thing it does is to clean the checkpoint files. Do you have any better ideas?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/2918

          FLINK-5214 Clean up checkpoint data in case of a failing checkpoint operation

          Adds exception handling to the stream operators for the snapshotState method. In case of an
          exception while performing the snapshot operation, all until then checkpointed data will
          be discarded/deleted. This makes sure that a failing checkpoint operation won't leave
          orphaned checkpoint data (e.g. files) behind.

          Add test case for FsCheckpointStateOutputStream

          Add RocksDB FullyAsyncSnapshot cleanup test

          Add proper state cleanup tests for window operator

          Add state cleanup test for failing snapshot call of AbstractUdfStreamOperator

          cc @StephanEwen

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink fixTaskCheckpointFailure

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2918.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2918


          commit 35fc74dd501fc49aa0b55f415c85c2140206220a
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2016-12-01T12:25:05Z

          FLINK-5214 Clean up checkpoint data in case of a failing checkpoint operation

          Adds exception handling to the stream operators for the snapshotState method. In case of an
          exception while performing the snapshot operation, all until then checkpointed data will
          be discarded/deleted. This makes sure that a failing checkpoint operation won't leave
          orphaned checkpoint data (e.g. files) behind.

          Add test case for FsCheckpointStateOutputStream

          Add RocksDB FullyAsyncSnapshot cleanup test

          Add proper state cleanup tests for window operator

          Add state cleanup test for failing snapshot call of AbstractUdfStreamOperator


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2918 FLINK-5214 Clean up checkpoint data in case of a failing checkpoint operation Adds exception handling to the stream operators for the snapshotState method. In case of an exception while performing the snapshot operation, all until then checkpointed data will be discarded/deleted. This makes sure that a failing checkpoint operation won't leave orphaned checkpoint data (e.g. files) behind. Add test case for FsCheckpointStateOutputStream Add RocksDB FullyAsyncSnapshot cleanup test Add proper state cleanup tests for window operator Add state cleanup test for failing snapshot call of AbstractUdfStreamOperator cc @StephanEwen You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixTaskCheckpointFailure Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2918.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2918 commit 35fc74dd501fc49aa0b55f415c85c2140206220a Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-12-01T12:25:05Z FLINK-5214 Clean up checkpoint data in case of a failing checkpoint operation Adds exception handling to the stream operators for the snapshotState method. In case of an exception while performing the snapshot operation, all until then checkpointed data will be discarded/deleted. This makes sure that a failing checkpoint operation won't leave orphaned checkpoint data (e.g. files) behind. Add test case for FsCheckpointStateOutputStream Add RocksDB FullyAsyncSnapshot cleanup test Add proper state cleanup tests for window operator Add state cleanup test for failing snapshot call of AbstractUdfStreamOperator
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2918

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2918
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Hi Xiaogang Shi, you're right that in case of a TM crash there might be orphaned checkpoint files left which are not properly cleaned up. I think what you describe in FLINK-5086 could be a viable solution to the problem. This is actually the follow up to this issue which covers the case of a recoverable exception.

          Show
          till.rohrmann Till Rohrmann added a comment - Hi Xiaogang Shi , you're right that in case of a TM crash there might be orphaned checkpoint files left which are not properly cleaned up. I think what you describe in FLINK-5086 could be a viable solution to the problem. This is actually the follow up to this issue which covers the case of a recoverable exception.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3178

          FLINK-5214 Clean up checkpoint data in case of a failing checkpoint operation

          Adds exception handling to the stream operators for the snapshotState method. A failing
          snapshot operation will trigger the clean up of all so far generated state resources.
          This will avoid that in case of a failing snapshot operation resources (e.g. files) are
          left behind.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink taskOperatorStateCleanup

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3178.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3178


          commit c1a597320dbe3c7f4514297d5ad7f2b8f416e287
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2016-12-01T12:25:05Z

          FLINK-5214 Clean up checkpoint data in case of a failing checkpoint operation

          Adds exception handling to the stream operators for the snapshotState method. A failing
          snapshot operation will trigger the clean up of all so far generated state resources.
          This will avoid that in case of a failing snapshot operation resources (e.g. files) are
          left behind.

          Add test case for OperatorSnapshotResult

          Add StateSnapshotContextSynchronousImplTest

          Add AbstractStreamOperator failing snapshot tests


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3178 FLINK-5214 Clean up checkpoint data in case of a failing checkpoint operation Adds exception handling to the stream operators for the snapshotState method. A failing snapshot operation will trigger the clean up of all so far generated state resources. This will avoid that in case of a failing snapshot operation resources (e.g. files) are left behind. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink taskOperatorStateCleanup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3178.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3178 commit c1a597320dbe3c7f4514297d5ad7f2b8f416e287 Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-12-01T12:25:05Z FLINK-5214 Clean up checkpoint data in case of a failing checkpoint operation Adds exception handling to the stream operators for the snapshotState method. A failing snapshot operation will trigger the clean up of all so far generated state resources. This will avoid that in case of a failing snapshot operation resources (e.g. files) are left behind. Add test case for OperatorSnapshotResult Add StateSnapshotContextSynchronousImplTest Add AbstractStreamOperator failing snapshot tests
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3183

          [backport] FLINK-5214 FLINK-5229 Backport StreamTask and StreamOperator checkpoint cleanup

          Backport of #3179 and #3178 onto the `release-1.2` branch.

          Adds exception handling to the stream operators for the snapshotState method. A failing
          snapshot operation will trigger the clean up of all so far generated state resources.
          This will avoid that in case of a failing snapshot operation resources (e.g. files) are
          left behind.

          This PR adds operator state cleanup to the StreamTask class. If a stream task contains multiple
          stream operators, then every operator is checkpointed. In case that a snapshot operation fails
          all state handles and OperatorSnapshotResults belonging to previous operators have to be freed.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink backportStateCleanup

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3183.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3183


          commit a0edef186c49520e353fe6cdc3321ef208e1bb3b
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2016-12-01T12:25:05Z

          FLINK-5214 Clean up checkpoint data in case of a failing checkpoint operation

          Adds exception handling to the stream operators for the snapshotState method. A failing
          snapshot operation will trigger the clean up of all so far generated state resources.
          This will avoid that in case of a failing snapshot operation resources (e.g. files) are
          left behind.

          Add test case for OperatorSnapshotResult

          Add StateSnapshotContextSynchronousImplTest

          Add AbstractStreamOperator failing snapshot tests

          commit 5eb4c2ff00a3818c53bac6c440d83bff0be8501a
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-01-20T13:28:44Z

          FLINK-5229 [state] Cleanup of operator snapshots if subsequent operator snapshots fail

          This PR adds operator state cleanup to the StreamTask class. If a stream task contains multiple
          stream operators, then every operator is checkpointed. In case that a snapshot operation fails
          all state handles and OperatorSnapshotResults belonging to previous operators have to be freed.

          Add test cases for failing checkpoint operations in StreamTask


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3183 [backport] FLINK-5214 FLINK-5229 Backport StreamTask and StreamOperator checkpoint cleanup Backport of #3179 and #3178 onto the `release-1.2` branch. Adds exception handling to the stream operators for the snapshotState method. A failing snapshot operation will trigger the clean up of all so far generated state resources. This will avoid that in case of a failing snapshot operation resources (e.g. files) are left behind. This PR adds operator state cleanup to the StreamTask class. If a stream task contains multiple stream operators, then every operator is checkpointed. In case that a snapshot operation fails all state handles and OperatorSnapshotResults belonging to previous operators have to be freed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink backportStateCleanup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3183.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3183 commit a0edef186c49520e353fe6cdc3321ef208e1bb3b Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-12-01T12:25:05Z FLINK-5214 Clean up checkpoint data in case of a failing checkpoint operation Adds exception handling to the stream operators for the snapshotState method. A failing snapshot operation will trigger the clean up of all so far generated state resources. This will avoid that in case of a failing snapshot operation resources (e.g. files) are left behind. Add test case for OperatorSnapshotResult Add StateSnapshotContextSynchronousImplTest Add AbstractStreamOperator failing snapshot tests commit 5eb4c2ff00a3818c53bac6c440d83bff0be8501a Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-01-20T13:28:44Z FLINK-5229 [state] Cleanup of operator snapshots if subsequent operator snapshots fail This PR adds operator state cleanup to the StreamTask class. If a stream task contains multiple stream operators, then every operator is checkpointed. In case that a snapshot operation fails all state handles and OperatorSnapshotResults belonging to previous operators have to be freed. Add test cases for failing checkpoint operations in StreamTask
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3178#discussion_r97282223

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java —
          @@ -342,21 +343,50 @@ public final OperatorSnapshotResult snapshotState(
          StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
          checkpointId, timestamp, streamFactory, keyGroupRange, getContainingTask().getCancelables());

          • snapshotState(snapshotContext);
            + try {
              • End diff –

          Here we could use the try-with-resource if `SnapshotContext` implements `AutoCloseable`

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3178#discussion_r97282223 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java — @@ -342,21 +343,50 @@ public final OperatorSnapshotResult snapshotState( StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, streamFactory, keyGroupRange, getContainingTask().getCancelables()); snapshotState(snapshotContext); + try { End diff – Here we could use the try-with-resource if `SnapshotContext` implements `AutoCloseable`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3178#discussion_r97279484

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java —
          @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex
          return new DoneFuture<>(stream.closeAndGetHandle());
          }

          -}
          \ No newline at end of file
          + private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException

          { + Preconditions.checkNotNull(stream); + + closableRegistry.unregisterClosable(stream.getDelegate()); + stream.getDelegate().close(); + }

          +
          + public void close() throws IOException {
          — End diff –

          If we already introduce a close method, we could also make this implement `AutoCloseable` and use it in try-with-resource?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3178#discussion_r97279484 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java — @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex return new DoneFuture<>(stream.closeAndGetHandle()); } -} \ No newline at end of file + private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException { + Preconditions.checkNotNull(stream); + + closableRegistry.unregisterClosable(stream.getDelegate()); + stream.getDelegate().close(); + } + + public void close() throws IOException { — End diff – If we already introduce a close method, we could also make this implement `AutoCloseable` and use it in try-with-resource?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3178#discussion_r97279062

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java —
          @@ -21,8 +21,9 @@
          import org.apache.flink.annotation.PublicEvolving;

          /**

          • * This interface provides a context in which operators that use managed (i.e. state that is managed by state
          • * backends) or raw (i.e. the operator can write it's state streams) state can perform a snapshot.
            + * This interface provides a context in which operators that use managed (i.e. state that is managed
              • End diff –

          All changes in this file are unrelated/reformatting.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3178#discussion_r97279062 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java — @@ -21,8 +21,9 @@ import org.apache.flink.annotation.PublicEvolving; /** * This interface provides a context in which operators that use managed (i.e. state that is managed by state * backends) or raw (i.e. the operator can write it's state streams) state can perform a snapshot. + * This interface provides a context in which operators that use managed (i.e. state that is managed End diff – All changes in this file are unrelated/reformatting.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3178#discussion_r97312291

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java —
          @@ -21,8 +21,9 @@
          import org.apache.flink.annotation.PublicEvolving;

          /**

          • * This interface provides a context in which operators that use managed (i.e. state that is managed by state
          • * backends) or raw (i.e. the operator can write it's state streams) state can perform a snapshot.
            + * This interface provides a context in which operators that use managed (i.e. state that is managed
              • End diff –

          True. Will revert them.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3178#discussion_r97312291 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java — @@ -21,8 +21,9 @@ import org.apache.flink.annotation.PublicEvolving; /** * This interface provides a context in which operators that use managed (i.e. state that is managed by state * backends) or raw (i.e. the operator can write it's state streams) state can perform a snapshot. + * This interface provides a context in which operators that use managed (i.e. state that is managed End diff – True. Will revert them.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3178#discussion_r97313258

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java —
          @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex
          return new DoneFuture<>(stream.closeAndGetHandle());
          }

          -}
          \ No newline at end of file
          + private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException

          { + Preconditions.checkNotNull(stream); + + closableRegistry.unregisterClosable(stream.getDelegate()); + stream.getDelegate().close(); + }

          +
          + public void close() throws IOException {
          — End diff –

          Good idea. I will let it implement the `Closeable` interface

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3178#discussion_r97313258 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java — @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex return new DoneFuture<>(stream.closeAndGetHandle()); } -} \ No newline at end of file + private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException { + Preconditions.checkNotNull(stream); + + closableRegistry.unregisterClosable(stream.getDelegate()); + stream.getDelegate().close(); + } + + public void close() throws IOException { — End diff – Good idea. I will let it implement the `Closeable` interface
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3178#discussion_r97313842

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java —
          @@ -342,21 +343,50 @@ public final OperatorSnapshotResult snapshotState(
          StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
          checkpointId, timestamp, streamFactory, keyGroupRange, getContainingTask().getCancelables());

          • snapshotState(snapshotContext);
            + try {
              • End diff –

          Yes, you're right. Will refactor the code that way

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3178#discussion_r97313842 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java — @@ -342,21 +343,50 @@ public final OperatorSnapshotResult snapshotState( StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, streamFactory, keyGroupRange, getContainingTask().getCancelables()); snapshotState(snapshotContext); + try { End diff – Yes, you're right. Will refactor the code that way
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3178

          Thanks for the review @StefanRRichter. I'll address your comments and then merge this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3178 Thanks for the review @StefanRRichter. I'll address your comments and then merge this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3178#discussion_r97314933

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java —
          @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex
          return new DoneFuture<>(stream.closeAndGetHandle());
          }

          -}
          \ No newline at end of file
          + private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException

          { + Preconditions.checkNotNull(stream); + + closableRegistry.unregisterClosable(stream.getDelegate()); + stream.getDelegate().close(); + }

          +
          + public void close() throws IOException {
          — End diff –

          I would prefer `AutoCloseable` over `Closeable`, because the former is general while the latter is more related to IO.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3178#discussion_r97314933 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java — @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex return new DoneFuture<>(stream.closeAndGetHandle()); } -} \ No newline at end of file + private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException { + Preconditions.checkNotNull(stream); + + closableRegistry.unregisterClosable(stream.getDelegate()); + stream.getDelegate().close(); + } + + public void close() throws IOException { — End diff – I would prefer `AutoCloseable` over `Closeable`, because the former is general while the latter is more related to IO.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3178#discussion_r97323947

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java —
          @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex
          return new DoneFuture<>(stream.closeAndGetHandle());
          }

          -}
          \ No newline at end of file
          + private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException

          { + Preconditions.checkNotNull(stream); + + closableRegistry.unregisterClosable(stream.getDelegate()); + stream.getDelegate().close(); + }

          +
          + public void close() throws IOException {
          — End diff –

          But isn't it better to be as specific as possible? In this case we wouldn't unnecessarily widen the thrown exception.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3178#discussion_r97323947 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java — @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex return new DoneFuture<>(stream.closeAndGetHandle()); } -} \ No newline at end of file + private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException { + Preconditions.checkNotNull(stream); + + closableRegistry.unregisterClosable(stream.getDelegate()); + stream.getDelegate().close(); + } + + public void close() throws IOException { — End diff – But isn't it better to be as specific as possible? In this case we wouldn't unnecessarily widen the thrown exception.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3178#discussion_r97324700

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java —
          @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex
          return new DoneFuture<>(stream.closeAndGetHandle());
          }

          -}
          \ No newline at end of file
          + private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException

          { + Preconditions.checkNotNull(stream); + + closableRegistry.unregisterClosable(stream.getDelegate()); + stream.getDelegate().close(); + }

          +
          + public void close() throws IOException {
          — End diff –

          Of course, but you can and should always pick a more specific exception on the signature of your implementation. If `IOException` or a subclass is appropriate here, then you could also go with `Closeable` of course.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3178#discussion_r97324700 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java — @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex return new DoneFuture<>(stream.closeAndGetHandle()); } -} \ No newline at end of file + private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException { + Preconditions.checkNotNull(stream); + + closableRegistry.unregisterClosable(stream.getDelegate()); + stream.getDelegate().close(); + } + + public void close() throws IOException { — End diff – Of course, but you can and should always pick a more specific exception on the signature of your implementation. If `IOException` or a subclass is appropriate here, then you could also go with `Closeable` of course.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3178

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3178
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3183

          Merged manually into `release-1.2` branch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3183 Merged manually into `release-1.2` branch.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann closed the pull request at:

          https://github.com/apache/flink/pull/3183

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/3183
          Hide
          till.rohrmann Till Rohrmann added a comment -

          1.3.0: e458975756e137ae2abb94e09fd92578ecd739bc
          1.2.0: 006fcc4443f837d1384b8e85f9fb6dd048d2f743
          1.1.4: 9c058871f778f748059829b1b350687e3f789f6f

          Show
          till.rohrmann Till Rohrmann added a comment - 1.3.0: e458975756e137ae2abb94e09fd92578ecd739bc 1.2.0: 006fcc4443f837d1384b8e85f9fb6dd048d2f743 1.1.4: 9c058871f778f748059829b1b350687e3f789f6f

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development