Details

      Description

      Currently, the only type of the snapshots in keyed streams is KeyGroupsStateHandle which is full and store the states one group after another. With the introduction of incremental checkpointing, we need a higher level abstraction of keyed snapshots to allow flexible snapshot formats.

      The implementation of KeyedStateHandle s may vary a lot in different backends. The only information needed in KeyedStateHandle s is their key group range. When recovering the job with a different degree of parallelism, KeyedStateHandle s will be assigned to those subtasks whose key group ranges overlap with their ranges.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang closed the pull request at:

          https://github.com/apache/flink/pull/3531

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/3531
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on the issue:

          https://github.com/apache/flink/pull/3531

          @StefanRRichter Thanks for your work. I will close the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3531 @StefanRRichter Thanks for your work. I will close the PR.
          Hide
          srichter Stefan Richter added a comment -

          Implemented in cd5527417a1cae57073a8855c6c3b88c88c780aa

          Show
          srichter Stefan Richter added a comment - Implemented in cd5527417a1cae57073a8855c6c3b88c88c780aa
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3531

          Merged in cd5527417a1cae57073a8855c6c3b88c88c780aa. @shixiaogang can you please close the PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3531 Merged in cd5527417a1cae57073a8855c6c3b88c88c780aa. @shixiaogang can you please close the PR?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3531

          Thanks for this very nice contribution @shixiaogang! I will merge this now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3531 Thanks for this very nice contribution @shixiaogang! I will merge this now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on the issue:

          https://github.com/apache/flink/pull/3531

          @StefanRRichter I updated the PR as suggested. Very appreciated for your hard work.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3531 @StefanRRichter I updated the PR as suggested. Very appreciated for your hard work.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3531

          @shixiaogang I had a few more comments on the updated PR. When they are resolved, I think this can be merged immediately.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3531 @shixiaogang I had a few more comments on the updated PR. When they are resolved, I think this can be merged immediately.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107877948

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java —
          @@ -30,7 +31,7 @@
          */
          public class OperatorSnapshotResult {

          • private RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture;
            + private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture;
            private RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture;
              • End diff –

          I think this generic type can also be changed to the higher-level interface `RunnableFuture<KeyedStateHandle>`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107877948 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotResult.java — @@ -30,7 +31,7 @@ */ public class OperatorSnapshotResult { private RunnableFuture<KeyGroupsStateHandle> keyedStateManagedFuture; + private RunnableFuture<KeyedStateHandle> keyedStateManagedFuture; private RunnableFuture<KeyGroupsStateHandle> keyedStateRawFuture; End diff – I think this generic type can also be changed to the higher-level interface `RunnableFuture<KeyedStateHandle>`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107876739

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java —
          @@ -57,6 +58,7 @@
          import org.mockito.invocation.InvocationOnMock;
          import org.mockito.stubbing.Answer;

          +import javax.swing.plaf.basic.BasicSplitPaneUI;
          — End diff –

          This import should not be here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107876739 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java — @@ -57,6 +58,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import javax.swing.plaf.basic.BasicSplitPaneUI; — End diff – This import should not be here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107874660

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java —
          @@ -0,0 +1,40 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state;
          +
          +/**
          + * Base for the handles of the checkpointed states in keyed streams. When
          + * recovering from failures, the handle will be passed to all tasks whose key
          + * group ranges overlap with it.
          + */
          +public interface KeyedStateHandle extends StateObject {
          +
          + /**
          + * Returns the range of the key groups contained in the state.
          + */
          + KeyGroupRange getKeyGroupRange();
          +
          + /**
          + * Returns a keyed state handle which contains the states for the given
          — End diff –

          This comment is not correct. It does not necessarily contain the whole given range, but the intersection of the given range with the range of the handle. You can just copy-paste it from the method in `KeyGroupsStateHandle`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107874660 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java — @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +/** + * Base for the handles of the checkpointed states in keyed streams. When + * recovering from failures, the handle will be passed to all tasks whose key + * group ranges overlap with it. + */ +public interface KeyedStateHandle extends StateObject { + + /** + * Returns the range of the key groups contained in the state. + */ + KeyGroupRange getKeyGroupRange(); + + /** + * Returns a keyed state handle which contains the states for the given — End diff – This comment is not correct. It does not necessarily contain the whole given range, but the intersection of the given range with the range of the handle. You can just copy-paste it from the method in `KeyGroupsStateHandle`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107874353

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -761,6 +769,13 @@ private void restoreKVStateMetaData() throws IOException, ClassNotFoundException
          private void restoreKVStateData() throws IOException, RocksDBException {
          //for all key-groups in the current state handle...
          for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
          + int keyGroup = keyGroupOffset.f0;
          +
          + // Skip those key groups that do not belong to the backend
          + if (!rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup)) {
          — End diff –

          The same comment I had on the `HeapKeyedStateBackend` also applies here: I think the post-filter is no longer required after the change i suggested for `StateAssignmentOperation`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107874353 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -761,6 +769,13 @@ private void restoreKVStateMetaData() throws IOException, ClassNotFoundException private void restoreKVStateData() throws IOException, RocksDBException { //for all key-groups in the current state handle... for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { + int keyGroup = keyGroupOffset.f0; + + // Skip those key groups that do not belong to the backend + if (!rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup)) { — End diff – The same comment I had on the `HeapKeyedStateBackend` also applies here: I think the post-filter is no longer required after the change i suggested for `StateAssignmentOperation`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107873583

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java —
          @@ -412,9 +421,15 @@ private void restorePartitionedState(Collection<KeyGroupsStateHandle> state) thr
          }
          }

          • for (Tuple2<Integer, Long> groupOffset : keyGroupsHandle.getGroupRangeOffsets()) {
            + for (Tuple2<Integer, Long> groupOffset : keyGroupsStateHandle.getGroupRangeOffsets()) {
            int keyGroupIndex = groupOffset.f0;
            long offset = groupOffset.f1;
            +
            + // Skip those key groups that don't belong to the backend.
            + if (!keyGroupRange.contains(keyGroupIndex)) {
              • End diff –

          I think this is no longer required, because we are now "cutting out" the right key-groups from each state handle in the `StateAssignmentOperation`, using the `KeyedStateHandle::getIntersection(...)` method. In fact, I think that receiving a key-group that is not in the backend's range could now be considered as an error. We could rewrite this as a precondition check, or an assert.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107873583 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java — @@ -412,9 +421,15 @@ private void restorePartitionedState(Collection<KeyGroupsStateHandle> state) thr } } for (Tuple2<Integer, Long> groupOffset : keyGroupsHandle.getGroupRangeOffsets()) { + for (Tuple2<Integer, Long> groupOffset : keyGroupsStateHandle.getGroupRangeOffsets()) { int keyGroupIndex = groupOffset.f0; long offset = groupOffset.f1; + + // Skip those key groups that don't belong to the backend. + if (!keyGroupRange.contains(keyGroupIndex)) { End diff – I think this is no longer required, because we are now "cutting out" the right key-groups from each state handle in the `StateAssignmentOperation`, using the `KeyedStateHandle::getIntersection(...)` method. In fact, I think that receiving a key-group that is not in the backend's range could now be considered as an error. We could rewrite this as a precondition check, or an assert.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107870871

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java —
          @@ -290,19 +291,19 @@ private static void assignTaskStatesToOperatorInstances(

          • <p>
          • <p>This is publicly visible to be used in tests.
            */
          • public static List<KeyGroupsStateHandle> getKeyGroupsStateHandles(
          • Collection<KeyGroupsStateHandle> allKeyGroupsHandles,
          • KeyGroupRange subtaskKeyGroupIds) {
            + public static List<KeyedStateHandle> getKeyedStateHandles(
            + Collection<? extends KeyedStateHandle> keyedStateHandles,
            + KeyGroupRange subtaskKeyGroupRange) {
          • List<KeyGroupsStateHandle> subtaskKeyGroupStates = new ArrayList<>();
            + List<KeyedStateHandle> subtaskKeyedStateHandles = new ArrayList<>();
          • for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) {
          • KeyGroupsStateHandle intersection = storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds);
          • if (intersection.getNumberOfKeyGroups() > 0) { - subtaskKeyGroupStates.add(intersection); - }

            + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
            + KeyedStateHandle intersectedKeyedStateHandle = keyedStateHandle.getIntersection(subtaskKeyGroupRange);
            +
            + subtaskKeyedStateHandles.add(intersectedKeyedStateHandle);

              • End diff –

          I think we should only add a state handle if the key group range from the intersection is non-empty. Even though those empty handles are probably ignored later, I think it is cleaner.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107870871 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java — @@ -290,19 +291,19 @@ private static void assignTaskStatesToOperatorInstances( <p> <p>This is publicly visible to be used in tests. */ public static List<KeyGroupsStateHandle> getKeyGroupsStateHandles( Collection<KeyGroupsStateHandle> allKeyGroupsHandles, KeyGroupRange subtaskKeyGroupIds) { + public static List<KeyedStateHandle> getKeyedStateHandles( + Collection<? extends KeyedStateHandle> keyedStateHandles, + KeyGroupRange subtaskKeyGroupRange) { List<KeyGroupsStateHandle> subtaskKeyGroupStates = new ArrayList<>(); + List<KeyedStateHandle> subtaskKeyedStateHandles = new ArrayList<>(); for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) { KeyGroupsStateHandle intersection = storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds); if (intersection.getNumberOfKeyGroups() > 0) { - subtaskKeyGroupStates.add(intersection); - } + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + KeyedStateHandle intersectedKeyedStateHandle = keyedStateHandle.getIntersection(subtaskKeyGroupRange); + + subtaskKeyedStateHandles.add(intersectedKeyedStateHandle); End diff – I think we should only add a state handle if the key group range from the intersection is non-empty. Even though those empty handles are probably ignored later, I think it is cleaner.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107704681

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java —
          @@ -91,10 +98,10 @@ public KeyGroupsStateHandle getKeyGroupIntersection(KeyGroupRange keyGroupRange)

          /**
          *

          • * @return the internal key-group range to offsets metadata
            + * @return the start key group in the key-group range of this handle
            */
          • public KeyGroupRangeOffsets getGroupRangeOffsets() {
          • return groupRangeOffsets;
            + public int getStartKeyGroup() {
              • End diff –

          I have removed all pass-through methods except `getGroupRangeOffsets()` because `StateInitializationContextImpl$KeyGroupStreamIterator` is using it to get the iterator for key groups and their offsets.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107704681 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java — @@ -91,10 +98,10 @@ public KeyGroupsStateHandle getKeyGroupIntersection(KeyGroupRange keyGroupRange) /** * * @return the internal key-group range to offsets metadata + * @return the start key group in the key-group range of this handle */ public KeyGroupRangeOffsets getGroupRangeOffsets() { return groupRangeOffsets; + public int getStartKeyGroup() { End diff – I have removed all pass-through methods except `getGroupRangeOffsets()` because `StateInitializationContextImpl$KeyGroupStreamIterator` is using it to get the iterator for key groups and their offsets.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on the issue:

          https://github.com/apache/flink/pull/3531

          @StefanRRichter Thanks a lot for your comments. I have updated the pull request as suggested, making the type of raw keyed states to be `KeyedStateHandle` as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/3531 @StefanRRichter Thanks a lot for your comments. I have updated the pull request as suggested, making the type of raw keyed states to be `KeyedStateHandle` as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107646547

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java —
          @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances(
          }

          /**
          + * Determine the subset of

          {@link KeyGroupsStateHandle KeyGroupsStateHandles}

          with correct
          + * key group index for the given subtask

          {@link KeyGroupRange}

          .
          + * <p>
          + * <p>This is publicly visible to be used in tests.
          + */
          + public static List<KeyedStateHandle> getKeyedStateHandles(
          — End diff –

          +1. Will update the PR as suggested.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107646547 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java — @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances( } /** + * Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct + * key group index for the given subtask {@link KeyGroupRange} . + * <p> + * <p>This is publicly visible to be used in tests. + */ + public static List<KeyedStateHandle> getKeyedStateHandles( — End diff – +1. Will update the PR as suggested.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107646429

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java —
          @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances(
          }

          /**
          + * Determine the subset of

          {@link KeyGroupsStateHandle KeyGroupsStateHandles}

          with correct
          + * key group index for the given subtask

          {@link KeyGroupRange}

          .
          + * <p>
          + * <p>This is publicly visible to be used in tests.
          + */
          + public static List<KeyedStateHandle> getKeyedStateHandles(
          + Collection<? extends KeyedStateHandle> keyedStateHandles,
          + KeyGroupRange subtaskKeyGroupRange) {
          +
          + List<KeyedStateHandle> subtaskKeyedStateHandles = new ArrayList<>();
          +
          + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
          + KeyGroupRange intersection = keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange);
          — End diff –

          The idea is great! It does make sense to allow a `KeyedStateHandle` to create a new `KeyedStateHandle` to the states of the sub range. I will update the PR as suggested.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107646429 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java — @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances( } /** + * Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct + * key group index for the given subtask {@link KeyGroupRange} . + * <p> + * <p>This is publicly visible to be used in tests. + */ + public static List<KeyedStateHandle> getKeyedStateHandles( + Collection<? extends KeyedStateHandle> keyedStateHandles, + KeyGroupRange subtaskKeyGroupRange) { + + List<KeyedStateHandle> subtaskKeyedStateHandles = new ArrayList<>(); + + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + KeyGroupRange intersection = keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange); — End diff – The idea is great! It does make sense to allow a `KeyedStateHandle` to create a new `KeyedStateHandle` to the states of the sub range. I will update the PR as suggested.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107411558

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java —
          @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances(
          }

          /**
          + * Determine the subset of

          {@link KeyGroupsStateHandle KeyGroupsStateHandles}

          with correct
          + * key group index for the given subtask

          {@link KeyGroupRange}

          .
          + * <p>
          + * <p>This is publicly visible to be used in tests.
          + */
          + public static List<KeyedStateHandle> getKeyedStateHandles(
          — End diff –

          Could we completely remove `getKeyGroupsStateHandles()` and remove everything with this method that works on `KeyedStateHandle`? This includes using also only `KeyedStateHandle` for the raw keyed state that is checkpointed into streams (without backend). Otherwise, this is almost duplicated code. What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107411558 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java — @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances( } /** + * Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct + * key group index for the given subtask {@link KeyGroupRange} . + * <p> + * <p>This is publicly visible to be used in tests. + */ + public static List<KeyedStateHandle> getKeyedStateHandles( — End diff – Could we completely remove `getKeyGroupsStateHandles()` and remove everything with this method that works on `KeyedStateHandle`? This includes using also only `KeyedStateHandle` for the raw keyed state that is checkpointed into streams (without backend). Otherwise, this is almost duplicated code. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107409091

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java —
          @@ -0,0 +1,33 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state;
          +
          +/**
          + * A snapshot in keyed streams. Each

          {@code KeyedStateHandle}

          contains the
          — End diff –

          A state handle typically means a pointer to some state. It will mostly be used for snapshots of state backends, but conceptional this i a different level of abstraction. I suggest to slightly modify this comment, moving away from the concrete implementations of snapshots, to the concept of having a pointer to serialized keyed state.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107409091 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateHandle.java — @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +/** + * A snapshot in keyed streams. Each {@code KeyedStateHandle} contains the — End diff – A state handle typically means a pointer to some state. It will mostly be used for snapshots of state backends, but conceptional this i a different level of abstraction. I suggest to slightly modify this comment, moving away from the concrete implementations of snapshots, to the concept of having a pointer to serialized keyed state.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107409859

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java —
          @@ -91,10 +98,10 @@ public KeyGroupsStateHandle getKeyGroupIntersection(KeyGroupRange keyGroupRange)

          /**
          *

          • * @return the internal key-group range to offsets metadata
            + * @return the start key group in the key-group range of this handle
            */
          • public KeyGroupRangeOffsets getGroupRangeOffsets() {
          • return groupRangeOffsets;
            + public int getStartKeyGroup() {
              • End diff –

          I suggest to remove this pass-through method, as it at a very low abstraction level. This information can be obtained already from the result of `getKeyGroupRange()`. Now, that we expose the `KeyGroupRange`, also `getNumberOfKeyGroups()` could be removed, as we can ask the `KeyGroupRange` directly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107409859 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java — @@ -91,10 +98,10 @@ public KeyGroupsStateHandle getKeyGroupIntersection(KeyGroupRange keyGroupRange) /** * * @return the internal key-group range to offsets metadata + * @return the start key group in the key-group range of this handle */ public KeyGroupRangeOffsets getGroupRangeOffsets() { return groupRangeOffsets; + public int getStartKeyGroup() { End diff – I suggest to remove this pass-through method, as it at a very low abstraction level. This information can be obtained already from the result of `getKeyGroupRange()`. Now, that we expose the `KeyGroupRange`, also `getNumberOfKeyGroups()` could be removed, as we can ask the `KeyGroupRange` directly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3531#discussion_r107410978

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java —
          @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances(
          }

          /**
          + * Determine the subset of

          {@link KeyGroupsStateHandle KeyGroupsStateHandles}

          with correct
          + * key group index for the given subtask

          {@link KeyGroupRange}

          .
          + * <p>
          + * <p>This is publicly visible to be used in tests.
          + */
          + public static List<KeyedStateHandle> getKeyedStateHandles(
          + Collection<? extends KeyedStateHandle> keyedStateHandles,
          + KeyGroupRange subtaskKeyGroupRange) {
          +
          + List<KeyedStateHandle> subtaskKeyedStateHandles = new ArrayList<>();
          +
          + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
          + KeyGroupRange intersection = keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange);
          — End diff –

          I wonder if we could somehow introduce a `KeyedStateHandle::intersect(KeyGroupRange)` that again returns a `KeyedStateHandle` with a `KeyGroupRage` that is the intersection of the original range and the argument. Basically a higher level version of what the KeyGroupsStateHandle can do, and the concrete implementations (like `KeyGroupsStateHandle`) know how the virtually split themselves up into a sub-range. This also would transfer less data in the RPC (less offsets) and saves the post-filtering in the backend.

          Otherwise, we could have a boolean method for just checking intersection, because there is no need to create `KeyGroupRange` objects anymore, because we do not actually use them.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107410978 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java — @@ -306,6 +307,29 @@ private static void assignTaskStatesToOperatorInstances( } /** + * Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct + * key group index for the given subtask {@link KeyGroupRange} . + * <p> + * <p>This is publicly visible to be used in tests. + */ + public static List<KeyedStateHandle> getKeyedStateHandles( + Collection<? extends KeyedStateHandle> keyedStateHandles, + KeyGroupRange subtaskKeyGroupRange) { + + List<KeyedStateHandle> subtaskKeyedStateHandles = new ArrayList<>(); + + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + KeyGroupRange intersection = keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange); — End diff – I wonder if we could somehow introduce a `KeyedStateHandle::intersect(KeyGroupRange)` that again returns a `KeyedStateHandle` with a `KeyGroupRage` that is the intersection of the original range and the argument. Basically a higher level version of what the KeyGroupsStateHandle can do, and the concrete implementations (like `KeyGroupsStateHandle`) know how the virtually split themselves up into a sub-range. This also would transfer less data in the RPC (less offsets) and saves the post-filtering in the backend. Otherwise, we could have a boolean method for just checking intersection, because there is no need to create `KeyGroupRange` objects anymore, because we do not actually use them.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user shixiaogang opened a pull request:

          https://github.com/apache/flink/pull/3531

          FLINK-6034[checkpoint] Add KeyedStateHandle for the snapshots in keyed streams

            1. Changes
          • Add `KeyedStateHandle` for the snapshots in keyed streams. `KeyGroupsStateHandle` now is one of its implementation.
          • Distribute `KeyedStateHandle`s to subtasks with their key group range. A `KeyedStateHandle` will be assigned to all subtasks whose key group range overlap with its range.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/shixiaogang/flink flink-6034

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3531.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3531


          commit 9637dcc40d66a2702f5227b7bbe3ae66fca89adf
          Author: xiaogang.sxg <xiaogang.sxg@alibaba-inc.com>
          Date: 2017-03-14T11:04:37Z

          Add KeyedStateHandle for the snapshots in keyed streams


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/3531 FLINK-6034 [checkpoint] Add KeyedStateHandle for the snapshots in keyed streams Changes Add `KeyedStateHandle` for the snapshots in keyed streams. `KeyGroupsStateHandle` now is one of its implementation. Distribute `KeyedStateHandle`s to subtasks with their key group range. A `KeyedStateHandle` will be assigned to all subtasks whose key group range overlap with its range. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shixiaogang/flink flink-6034 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3531.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3531 commit 9637dcc40d66a2702f5227b7bbe3ae66fca89adf Author: xiaogang.sxg <xiaogang.sxg@alibaba-inc.com> Date: 2017-03-14T11:04:37Z Add KeyedStateHandle for the snapshots in keyed streams

            People

            • Assignee:
              xiaogang.shi Xiaogang Shi
              Reporter:
              xiaogang.shi Xiaogang Shi
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development