Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5265

Introduce state handle replication mode for CheckpointCoordinator

    Details

      Description

      Currently, the CheckpointCoordinator only supports repartitioning of {{OperatorStateHandle}}s based on a split-and-distribute strategy. For future state types, such as broadcast or union state, we need a different repartitioning method that allows for replicating state handles to all subtasks.

      This is the first step on the way to implementing broadcast and union states.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/2948

          FLINK-5265 Introduce state handle replication mode for CheckpointCoordinator

          Currently, the ``CheckpointCoordinator`` only supports repartitioning of ``OperatorStateHandle``s based on a split-and-distribute strategy. For future state types, such as broadcast or union state, we need a different repartitioning method that allows for replicating state handles to all subtasks.

          This functionality is introduced with this PR and represents the first step on the way to implementing broadcast and union states.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink broadcast-op-state

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2948.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2948



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2948 FLINK-5265 Introduce state handle replication mode for CheckpointCoordinator Currently, the ``CheckpointCoordinator`` only supports repartitioning of ``OperatorStateHandle``s based on a split-and-distribute strategy. For future state types, such as broadcast or union state, we need a different repartitioning method that allows for replicating state handles to all subtasks. This functionality is introduced with this PR and represents the first step on the way to implementing broadcast and union states. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink broadcast-op-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2948.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2948
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/2948

          CC @aljoscha @StephanEwen

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2948 CC @aljoscha @StephanEwen
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2948#discussion_r95979296

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java —
          @@ -66,8 +66,10 @@ OperatorStateHandle closeAndGetHandle() throws IOException

          { startNewPartition(); }
          • Map<String, long[]> offsetsMap = new HashMap<>(1);
          • offsetsMap.put(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, partitionOffsets.toArray());
            + Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(1);
            + OperatorStateHandle.StateMetaInfo metaInfo =
            + new OperatorStateHandle.StateMetaInfo(partitionOffsets.toArray(), OperatorStateHandle.Mode.BROADCAST);
              • End diff –

          I think this should be `SPLIT_REDISTRIBUTE` to conform to the old behaviour.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2948#discussion_r95979296 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java — @@ -66,8 +66,10 @@ OperatorStateHandle closeAndGetHandle() throws IOException { startNewPartition(); } Map<String, long[]> offsetsMap = new HashMap<>(1); offsetsMap.put(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, partitionOffsets.toArray()); + Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(1); + OperatorStateHandle.StateMetaInfo metaInfo = + new OperatorStateHandle.StateMetaInfo(partitionOffsets.toArray(), OperatorStateHandle.Mode.BROADCAST); End diff – I think this should be `SPLIT_REDISTRIBUTE` to conform to the old behaviour.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2948#discussion_r95991932

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java —
          @@ -66,8 +66,10 @@ OperatorStateHandle closeAndGetHandle() throws IOException

          { startNewPartition(); }
          • Map<String, long[]> offsetsMap = new HashMap<>(1);
          • offsetsMap.put(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, partitionOffsets.toArray());
            + Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(1);
            + OperatorStateHandle.StateMetaInfo metaInfo =
            + new OperatorStateHandle.StateMetaInfo(partitionOffsets.toArray(), OperatorStateHandle.Mode.BROADCAST);
              • End diff –

          Yes, that is correct . Will change this as suggested.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2948#discussion_r95991932 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateCheckpointOutputStream.java — @@ -66,8 +66,10 @@ OperatorStateHandle closeAndGetHandle() throws IOException { startNewPartition(); } Map<String, long[]> offsetsMap = new HashMap<>(1); offsetsMap.put(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, partitionOffsets.toArray()); + Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(1); + OperatorStateHandle.StateMetaInfo metaInfo = + new OperatorStateHandle.StateMetaInfo(partitionOffsets.toArray(), OperatorStateHandle.Mode.BROADCAST); End diff – Yes, that is correct . Will change this as suggested.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2948

          Thanks for your work! 👍 I merged, could you please close this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2948 Thanks for your work! 👍 I merged, could you please close this PR?
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Lower level code in master in:
          1020ba2c9cfc1d01703e97c72e20a922bae0732d

          Show
          aljoscha Aljoscha Krettek added a comment - Lower level code in master in: 1020ba2c9cfc1d01703e97c72e20a922bae0732d
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter closed the pull request at:

          https://github.com/apache/flink/pull/2948

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/2948
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Implemented on release-1.2 in:
          29fbc49bf2df34bed64c128d2d8b74656ed27fc8

          Show
          aljoscha Aljoscha Krettek added a comment - Implemented on release-1.2 in: 29fbc49bf2df34bed64c128d2d8b74656ed27fc8

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              srichter Stefan Richter
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development