Details

      Description

      Currently, externalizing checkpoint metadata and storing savepoints depends strictly on FileSystems.

      Since state backends are more general, storing and cleaning up checkpoints with state backend hooks requires to untie savepoints and externalized checkpoints from filesystems.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/3411

          FLINK-5897 & FLINK-5822 First step towards Generic State Backends and Global State Cleanup Hooks

          *This is the first part of a larger parent issue: Self-contained externalized checkpoints and global cleanup hooks.*

          Parts of the changes may seem incomplete, because they are preparation for later changes. To avoid too large pull requests, this is the first part that by itself is stable and compatible with the prior behavior.

            1. High-level changes

          1. The Checkpoint Coordinator knows about the base state backend. That is the first step towards generic storage of checkpoints (not file system specific) and global cleanup hooks (rather than tracking for example each file cleanup individually).

          2. The CompletedCheckpoint is not assuming that it is stored on a FileSystem, but holds now a `StreamStateHandle` to its metadata (if externalized) and a generic external pointer. In the case of a checkpoint on a FileSystem, this pointer is the file path.

            1. Detailed Changed
          • This moves the logic to load a statebackend from the configuration out of the `StreamTask` and into the `AbstractStateBackend`, because both JobManager and TaskManager now share the same logic.
          • Adds tests for the loading behavior of state backends
          • Improves the Exception signatures of state backend loading
          • Allows CompletedCheckpointStores to specify whether they require externalized checkpoints.
            That is important for the next step, where the ZooKeeper store only stores pointers and does not externalize the metadata an additional time.
          • `CompletedCheckpoint` holds pointer to metadata
          • CheckpointCoordinator externalizes the checkpoint explicitly (rather than the pending checkpoint does it implicitly).
          • More comments and JavaDocs
            1. Tests

          Most of the functionality just made some parts more generic and needs no additional tests.

          Additional tests were added for the passing of state backend from program to checkpoint coordinator, and for the loading of the state backend from the configurations.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink statebackend

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3411.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3411


          commit 5689aba6f2ae0363e7e36ef5d920fdae88d5b5cc
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2017-02-17T16:51:00Z

          FLINK-5822 [state backends] Make JobManager / Checkpoint Coordinator aware of the root state backend

          commit 16c1f5afaabd0cff58afe5086ae0aabc82441072
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2017-02-22T21:18:50Z

          FLINK-5897 [checkpoints] Make checkpoint externalization not depend strictly on FileSystems

          That is the first step towards checkpoints that can be externalized to other stores as well,
          like k/v stores and databases, if supported by the state backend.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3411 FLINK-5897 & FLINK-5822 First step towards Generic State Backends and Global State Cleanup Hooks * This is the first part of a larger parent issue: Self-contained externalized checkpoints and global cleanup hooks. * Parts of the changes may seem incomplete, because they are preparation for later changes. To avoid too large pull requests, this is the first part that by itself is stable and compatible with the prior behavior. High-level changes 1. The Checkpoint Coordinator knows about the base state backend. That is the first step towards generic storage of checkpoints (not file system specific) and global cleanup hooks (rather than tracking for example each file cleanup individually). 2. The CompletedCheckpoint is not assuming that it is stored on a FileSystem, but holds now a `StreamStateHandle` to its metadata (if externalized) and a generic external pointer. In the case of a checkpoint on a FileSystem, this pointer is the file path. Detailed Changed This moves the logic to load a statebackend from the configuration out of the `StreamTask` and into the `AbstractStateBackend`, because both JobManager and TaskManager now share the same logic. Adds tests for the loading behavior of state backends Improves the Exception signatures of state backend loading Allows CompletedCheckpointStores to specify whether they require externalized checkpoints. That is important for the next step, where the ZooKeeper store only stores pointers and does not externalize the metadata an additional time. `CompletedCheckpoint` holds pointer to metadata CheckpointCoordinator externalizes the checkpoint explicitly (rather than the pending checkpoint does it implicitly). More comments and JavaDocs Tests Most of the functionality just made some parts more generic and needs no additional tests. Additional tests were added for the passing of state backend from program to checkpoint coordinator, and for the loading of the state backend from the configurations. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink statebackend Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3411.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3411 commit 5689aba6f2ae0363e7e36ef5d920fdae88d5b5cc Author: Stephan Ewen <sewen@apache.org> Date: 2017-02-17T16:51:00Z FLINK-5822 [state backends] Make JobManager / Checkpoint Coordinator aware of the root state backend commit 16c1f5afaabd0cff58afe5086ae0aabc82441072 Author: Stephan Ewen <sewen@apache.org> Date: 2017-02-22T21:18:50Z FLINK-5897 [checkpoints] Make checkpoint externalization not depend strictly on FileSystems That is the first step towards checkpoints that can be externalized to other stores as well, like k/v stores and databases, if supported by the state backend.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/3411

          The test failure is an unrelated instability that I've seen over the week end as well (https://issues.apache.org/jira/browse/FLINK-5923).

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3411 The test failure is an unrelated instability that I've seen over the week end as well ( https://issues.apache.org/jira/browse/FLINK-5923 ).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3411#discussion_r103169593

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java —
          @@ -72,4 +72,5 @@
          */
          int getNumberOfRetainedCheckpoints();

          + boolean requiresExternalizedCheckpoints();
          — End diff –

          Should we add a short JavaDocs comment?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3411#discussion_r103169593 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java — @@ -72,4 +72,5 @@ */ int getNumberOfRetainedCheckpoints(); + boolean requiresExternalizedCheckpoints(); — End diff – Should we add a short JavaDocs comment?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3411#discussion_r103169078

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java —
          @@ -100,7 +107,9 @@

          • accessing this don't block the job manager actor and run asynchronously. */
            private final CompletedCheckpointStore completedCheckpointStore;
          • /** Default directory for persistent checkpoints; <code>null</code> if none configured. */
            + /** Default directory for persistent checkpoints; <code>null</code> if none configured.
            + * THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */
              • End diff –

          Very good! :+1: Will you file follow up JIRAs for this?

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3411#discussion_r103169078 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java — @@ -100,7 +107,9 @@ accessing this don't block the job manager actor and run asynchronously. */ private final CompletedCheckpointStore completedCheckpointStore; /** Default directory for persistent checkpoints; <code>null</code> if none configured. */ + /** Default directory for persistent checkpoints; <code>null</code> if none configured. + * THIS WILL BE REPLACED BY PROPER STATE-BACKEND METADATA WRITING */ End diff – Very good! :+1: Will you file follow up JIRAs for this?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3411#discussion_r103170089

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java —
          @@ -21,20 +21,50 @@
          import org.apache.flink.annotation.PublicEvolving;
          import org.apache.flink.api.common.JobID;
          import org.apache.flink.api.common.typeutils.TypeSerializer;
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.configuration.CoreOptions;
          +import org.apache.flink.configuration.IllegalConfigurationException;
          import org.apache.flink.runtime.execution.Environment;
          import org.apache.flink.runtime.query.TaskKvStateRegistry;
          +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
          +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
          +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
          +import org.apache.flink.util.DynamicCodeLoadingException;
          +
          +import org.slf4j.Logger;

          import javax.annotation.Nullable;
          import java.io.IOException;

          +import static org.apache.flink.util.Preconditions.checkNotNull;
          +
          /**

          • An abstract base implementation of the {@link StateBackend}

            interface.
            + *
            + * <p>

              • End diff –

          Left over comment

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3411#discussion_r103170089 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java — @@ -21,20 +21,50 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.DynamicCodeLoadingException; + +import org.slf4j.Logger; import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** An abstract base implementation of the {@link StateBackend} interface. + * + * <p> End diff – Left over comment
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3411#discussion_r103192397

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java —
          @@ -203,48 +208,67 @@ void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback)

          { return onCompletionPromise; }
          • public CompletedCheckpoint finalizeCheckpoint() {
            + public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException {
            +
            synchronized (lock) {
          • Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
            -
          • // Persist if required
          • String externalPath = null;
          • if (props.externalizeCheckpoint()) {
          • try { - Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); - externalPath = SavepointStore.storeSavepoint( - targetDirectory, - savepoint - ); - }

            catch (IOException e) {

          • LOG.error("Failed to persist checkpoint {}.",checkpointId, e);
          • }
          • }
            + checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet.");
          • CompletedCheckpoint completed = new CompletedCheckpoint(
          • jobId,
          • checkpointId,
          • checkpointTimestamp,
          • System.currentTimeMillis(),
          • new HashMap<>(taskStates),
          • props,
          • externalPath);
            + // externalize the metadata
            + final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
          • onCompletionPromise.complete(completed);
            + // TEMP FIX - The savepoint store is strictly typed to file systems currently
            + // but the checkpoints think more generic. we need to work with file handles
            + // here until the savepoint serializer accepts a generic stream factory
          • if (statsCallback != null) { - // Finalize the statsCallback and give the completed checkpoint a - // callback for discards. - CompletedCheckpointStats.DiscardCallback discardCallback = statsCallback.reportCompletedCheckpoint(externalPath); - completed.setDiscardCallback(discardCallback); - }

            + final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
            + final String externalPointer = metadataHandle.getFilePath().getParent().toString();

          • dispose(false);
            + return finalizeInternal(metadataHandle, externalPointer);
            + }
            + }
            +
            + public CompletedCheckpoint finalizeCheckpointNonExternalized()
            Unknown macro: { + synchronized (lock) { + checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - return completed; + // finalize without external metadata + return finalizeInternal(null, null); } }

          + @GuardedBy("lock")
          + private CompletedCheckpoint finalizeInternal(
          + @Nullable StreamStateHandle externalMetadata,
          + @Nullable String externalPointer) {
          +
          + assert(Thread.holdsLock(lock));
          +
          + CompletedCheckpoint completed = new CompletedCheckpoint(
          + jobId,
          + checkpointId,
          + checkpointTimestamp,
          + System.currentTimeMillis(),
          + new HashMap<>(taskStates),
          + props,
          + externalMetadata,
          + externalPointer);
          +
          + onCompletionPromise.complete(completed);
          — End diff –

          If the creation `CompletedCheckpoint` fails (for example because it the external metadata is null although the properties say the checkpoint should have been externalized), the promise is never completed. I think we should do a try catch and fail the promise in that case.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3411#discussion_r103192397 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java — @@ -203,48 +208,67 @@ void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback) { return onCompletionPromise; } public CompletedCheckpoint finalizeCheckpoint() { + public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException { + synchronized (lock) { Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - // Persist if required String externalPath = null; if (props.externalizeCheckpoint()) { try { - Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); - externalPath = SavepointStore.storeSavepoint( - targetDirectory, - savepoint - ); - } catch (IOException e) { LOG.error("Failed to persist checkpoint {}.",checkpointId, e); } } + checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); CompletedCheckpoint completed = new CompletedCheckpoint( jobId, checkpointId, checkpointTimestamp, System.currentTimeMillis(), new HashMap<>(taskStates), props, externalPath); + // externalize the metadata + final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); onCompletionPromise.complete(completed); + // TEMP FIX - The savepoint store is strictly typed to file systems currently + // but the checkpoints think more generic. we need to work with file handles + // here until the savepoint serializer accepts a generic stream factory if (statsCallback != null) { - // Finalize the statsCallback and give the completed checkpoint a - // callback for discards. - CompletedCheckpointStats.DiscardCallback discardCallback = statsCallback.reportCompletedCheckpoint(externalPath); - completed.setDiscardCallback(discardCallback); - } + final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint); + final String externalPointer = metadataHandle.getFilePath().getParent().toString(); dispose(false); + return finalizeInternal(metadataHandle, externalPointer); + } + } + + public CompletedCheckpoint finalizeCheckpointNonExternalized() Unknown macro: { + synchronized (lock) { + checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - return completed; + // finalize without external metadata + return finalizeInternal(null, null); } } + @GuardedBy("lock") + private CompletedCheckpoint finalizeInternal( + @Nullable StreamStateHandle externalMetadata, + @Nullable String externalPointer) { + + assert(Thread.holdsLock(lock)); + + CompletedCheckpoint completed = new CompletedCheckpoint( + jobId, + checkpointId, + checkpointTimestamp, + System.currentTimeMillis(), + new HashMap<>(taskStates), + props, + externalMetadata, + externalPointer); + + onCompletionPromise.complete(completed); — End diff – If the creation `CompletedCheckpoint` fails (for example because it the external metadata is null although the properties say the checkpoint should have been externalized), the promise is never completed. I think we should do a try catch and fail the promise in that case.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3411

          Thanks for the review, @uce - addressing the comments and merging this...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3411 Thanks for the review, @uce - addressing the comments and merging this...
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in 5b7f21d891b410ca0046efdaf12caf5e73deadf4

          Show
          StephanEwen Stephan Ewen added a comment - Fixed in 5b7f21d891b410ca0046efdaf12caf5e73deadf4
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3411

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3411

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development