Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3755

Introduce key groups for key-value state to support dynamic scaling

    Details

    • Type: New Feature
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.2.0
    • Component/s: None
    • Labels:
      None

      Description

      In order to support dynamic scaling, it is necessary to sub-partition the key-value states of each operator. This sub-partitioning, which produces a set of key groups, allows to easily scale in and out Flink jobs by simply reassigning the different key groups to the new set of sub tasks. The idea of key groups is described in this design document [1].

      [1] https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing

        Issue Links

        There are no Sub-Tasks for this issue.

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/2376

          FLINK-3755 Introduce key groups for key-value state to support dynamic scaling

          This pull request introduces the concept of key groups to Flink. A key group is the smallest assignable unit of key-value state to a stream operator. Differently said, it is a sub set of the key space which can be assigned to a stream operator. With the introduction of key groups, it will be possible to dynamically scale Flink operators that use partitioned (=key-value) state.

          In particular, this pull request addresses the following sub-issues:

          • fully: FLINK-4380 Introduce KeyGroupAssigner and Max-Parallelism Parameter
          • partially: FLINK-4381 Refactor State to Prepare For Key-Group State Backends

          Furthermore, this pull request is partially based on pull request: #1988

          Overall, this pull request introduces the following changes:

          1. 1) Adopted from #1988 (plus introduction of distributing keys as ranges (`KeyGroupRange`)
            1. a) Introduction of KeyGroupAssigner

          In order to partition keys into key groups, the`KeyGroupAssigner` interface is introduced. This allows for partitioning the key space into smaller units which can be assigned to operators. A scale up/down of parallelism is then performed by simply reassigning the key groups to more/less operators.

          For this pull request, the former `HashPartitioner` is now renamed to `KeyGroupStreamPartitioner` and uses the `KeyGroupAssigner` to distribute the streaming records in a consistent way w.r.t. the key group mappings. The key groups, in turn, are mapped as ranges of key groups (`key group index * parallelism / number of key groups` = out-going channel) to the downstream tasks.

          When restoring from a checkpoint or savepoint, scale up/down of parallelism basically boils down to splitting/merging the key group ranges in alignment with the adjusting assignment to operators that happens automatically through the `KeyGroupStreamPartitioner`.

            1. b) Introduction of MaxParallelism to user API

          In order to scale programs up or down, it is necessary to define the maximum number of key groups. The maximum number of key groups denotes the maximum parallelism of an operator, because every operator needs at least one key group to get elements assigned. Thus, in order to specify this upper limit, the ExecutionConfig allows to set a job-wide max parallelism value via ExecutionConfig.setMaxParallelism. In addition to that the SingleOutputStreamOperator allows to set similarly to the parallelism a max parallelism value on an operator basis. If the max parallelism has not been set and there is no job-wide max parallelism set, the parallelism of the operator will be taken as the max parallelism. Thus, every operator would then receive a single key group. Currently, we restrict the maximum number of key groups to 2^15 (Short.MAX_VALUE).

          1. 2) State and StateHandle refactoring
            1. a) StateHandle refactoring

          We have simplified and cleaned up the hierarchy and use cases of state handles. `StreamStateHandle` and `RetrievableStateHandle` are now at the heart of the state handles system.
          Their conceptual main difference is that `StreamStateHandle` provides a seekable input stream to the actual state data and leaves state reconstruction to client code, whereas `RetrievableStateHandle` represents a simple way for client code to retrieve state as readily constructed object and the state handle implementation taking care of state reconstruction.

            1. b) Operator serialization

          The unified abstraction for operators to persist their state is `CheckpointStateOutputStream`. All operators can simply directly write their serialized state into this stream, which returns a `StreamStateHandle` on close. `StreamTaskState` and `StreamTaskStateList` become obsolete. This change makes versioning of operator state serialization formats easier and we should ensure and test that our operators are aware of serialization versions.

          This change leaves the following methods for snapshot/restore in `StreamOperator`:
          ```
          void snapshotState(
          FSDataOutputStream out,
          long checkpointId,
          long timestamp) throws Exception;

          void restoreState(FSDataInputStream in) throws Exception;
          ```

            1. c) Split task state into operator state (= non-partitioned state) and keyed-state (= partitioned state)

          We have split the operator state into operator state and keyed state as follows.

          Operator state is organized as a `ChainedStateHandle<StreamStateHandle>`. The chained state handle encapsulates the individual `StreamStateHandle` for all operators in an operator chain.

          Keyed state is organized as a `List<KeyGroupsStateHandle>`. Each `KeyGroupsStateHandle` consists of one `StreamStateHandle` and one `KeyGroupRangeOffsets` object. `KeyGroupRangeOffsets` denotes the range of all key groups that can are referenced by the handle, together with their respective stream offsets. The `StreamStateHandle` gives access to a seekable stream that contains the actual state data for all key groups from the key group range; individual key group states are located in the stream at their previously mentioned stream offsets.
          Notice that we have to provide a list of `KeyGroupsStateHandle` to support the case of scaling down parallelism. In this case, it can happen that key group states from several `KeyGroupsStateHandle` (each representing the state of one previously existing operator) have to be combined to form the keyed state of reduced amount of current operators.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink keyed-state

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2376.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2376


          commit 8a57da24b499e059fb73bd7050a96d32b57fcec4
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2016-07-28T13:08:24Z

          FLINK-4380 Introduce KeyGroupAssigner and Max-Parallelism Parameter

          This introduces a new KeySelector that assigns keys to key groups and
          also adds the max parallelism parameter throughout all API levels.

          This also adds tests for the newly introduced features.

          commit 62fb798b762d8a69d30479561ed43b580facc600
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2016-08-11T09:59:07Z

          FLINK-4381 Refactor State to Prepare For Key-Group State Backends

          commit c038d6d9435c329ab4ca06c119cff5456924b5ab
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2016-08-11T10:14:18Z

          FLINK-4380 Add tests for new Key-Group/Max-Parallelism

          This tests the rescaling features in CheckpointCoordinator and
          SavepointCoordinator.

          commit 751effb855a81e6322a7e897c98dc59ea065d072
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-08-12T09:07:09Z

          Ignore RescalingITCase


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2376 FLINK-3755 Introduce key groups for key-value state to support dynamic scaling This pull request introduces the concept of key groups to Flink. A key group is the smallest assignable unit of key-value state to a stream operator. Differently said, it is a sub set of the key space which can be assigned to a stream operator. With the introduction of key groups, it will be possible to dynamically scale Flink operators that use partitioned (=key-value) state. In particular, this pull request addresses the following sub-issues: fully: FLINK-4380 Introduce KeyGroupAssigner and Max-Parallelism Parameter partially: FLINK-4381 Refactor State to Prepare For Key-Group State Backends Furthermore, this pull request is partially based on pull request: #1988 Overall, this pull request introduces the following changes: 1) Adopted from #1988 (plus introduction of distributing keys as ranges (`KeyGroupRange`) a) Introduction of KeyGroupAssigner In order to partition keys into key groups, the`KeyGroupAssigner` interface is introduced. This allows for partitioning the key space into smaller units which can be assigned to operators. A scale up/down of parallelism is then performed by simply reassigning the key groups to more/less operators. For this pull request, the former `HashPartitioner` is now renamed to `KeyGroupStreamPartitioner` and uses the `KeyGroupAssigner` to distribute the streaming records in a consistent way w.r.t. the key group mappings. The key groups, in turn, are mapped as ranges of key groups (`key group index * parallelism / number of key groups` = out-going channel) to the downstream tasks. When restoring from a checkpoint or savepoint, scale up/down of parallelism basically boils down to splitting/merging the key group ranges in alignment with the adjusting assignment to operators that happens automatically through the `KeyGroupStreamPartitioner`. b) Introduction of MaxParallelism to user API In order to scale programs up or down, it is necessary to define the maximum number of key groups. The maximum number of key groups denotes the maximum parallelism of an operator, because every operator needs at least one key group to get elements assigned. Thus, in order to specify this upper limit, the ExecutionConfig allows to set a job-wide max parallelism value via ExecutionConfig.setMaxParallelism. In addition to that the SingleOutputStreamOperator allows to set similarly to the parallelism a max parallelism value on an operator basis. If the max parallelism has not been set and there is no job-wide max parallelism set, the parallelism of the operator will be taken as the max parallelism. Thus, every operator would then receive a single key group. Currently, we restrict the maximum number of key groups to 2^15 (Short.MAX_VALUE). 2) State and StateHandle refactoring a) StateHandle refactoring We have simplified and cleaned up the hierarchy and use cases of state handles. `StreamStateHandle` and `RetrievableStateHandle` are now at the heart of the state handles system. Their conceptual main difference is that `StreamStateHandle` provides a seekable input stream to the actual state data and leaves state reconstruction to client code, whereas `RetrievableStateHandle` represents a simple way for client code to retrieve state as readily constructed object and the state handle implementation taking care of state reconstruction. b) Operator serialization The unified abstraction for operators to persist their state is `CheckpointStateOutputStream`. All operators can simply directly write their serialized state into this stream, which returns a `StreamStateHandle` on close. `StreamTaskState` and `StreamTaskStateList` become obsolete. This change makes versioning of operator state serialization formats easier and we should ensure and test that our operators are aware of serialization versions. This change leaves the following methods for snapshot/restore in `StreamOperator`: ``` void snapshotState( FSDataOutputStream out, long checkpointId, long timestamp) throws Exception; void restoreState(FSDataInputStream in) throws Exception; ``` c) Split task state into operator state (= non-partitioned state) and keyed-state (= partitioned state) We have split the operator state into operator state and keyed state as follows. Operator state is organized as a `ChainedStateHandle<StreamStateHandle>`. The chained state handle encapsulates the individual `StreamStateHandle` for all operators in an operator chain. Keyed state is organized as a `List<KeyGroupsStateHandle>`. Each `KeyGroupsStateHandle` consists of one `StreamStateHandle` and one `KeyGroupRangeOffsets` object. `KeyGroupRangeOffsets` denotes the range of all key groups that can are referenced by the handle, together with their respective stream offsets. The `StreamStateHandle` gives access to a seekable stream that contains the actual state data for all key groups from the key group range; individual key group states are located in the stream at their previously mentioned stream offsets. Notice that we have to provide a list of `KeyGroupsStateHandle` to support the case of scaling down parallelism. In this case, it can happen that key group states from several `KeyGroupsStateHandle` (each representing the state of one previously existing operator) have to be combined to form the keyed state of reduced amount of current operators. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink keyed-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2376.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2376 commit 8a57da24b499e059fb73bd7050a96d32b57fcec4 Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-07-28T13:08:24Z FLINK-4380 Introduce KeyGroupAssigner and Max-Parallelism Parameter This introduces a new KeySelector that assigns keys to key groups and also adds the max parallelism parameter throughout all API levels. This also adds tests for the newly introduced features. commit 62fb798b762d8a69d30479561ed43b580facc600 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2016-08-11T09:59:07Z FLINK-4381 Refactor State to Prepare For Key-Group State Backends commit c038d6d9435c329ab4ca06c119cff5456924b5ab Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-08-11T10:14:18Z FLINK-4380 Add tests for new Key-Group/Max-Parallelism This tests the rescaling features in CheckpointCoordinator and SavepointCoordinator. commit 751effb855a81e6322a7e897c98dc59ea065d072 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-08-12T09:07:09Z Ignore RescalingITCase
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/2376

          R: @tillrohrmann @StephanEwen for review pls

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2376 R: @tillrohrmann @StephanEwen for review pls
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2376

          Very good work and very nice code!

          Some comments after a joint review:

          • The most critical issue is that there should not be any blocking on async threads during task shutdown. This unnecessarily delays responses to canceling and redeployment.
          • At this point, the `KeyGroupAssigner` interface seems a bit useless, especially if it is not parametrized with variable key group mappings. For the sake of making this simpler and more efficient, one could just have a static method for that.
          • I would suggest to make the assumption that key groups are always used (they should be, even if their number is equal to the parallelism), and drop the checks for `numberOfKeyGroups > 0`, for example in the KeyGroupHashPartitioner.
          • A bit more difficult is what to assume as the default number of key groups. We thought about assuming a default of `128`. That has no overhead in state backends like RocksDB and also allows initial job deployments which did not think about properly configuring this to have some freedom to scale out. If the parallelism is >= 128, this should probably round to the next highest power-of-two.
          • There are some log statements which cause log flooding, like an INFO log statement for every checkpoint stream (factory) created.
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2376 Very good work and very nice code! Some comments after a joint review: The most critical issue is that there should not be any blocking on async threads during task shutdown. This unnecessarily delays responses to canceling and redeployment. At this point, the `KeyGroupAssigner` interface seems a bit useless, especially if it is not parametrized with variable key group mappings. For the sake of making this simpler and more efficient, one could just have a static method for that. I would suggest to make the assumption that key groups are always used (they should be, even if their number is equal to the parallelism), and drop the checks for `numberOfKeyGroups > 0`, for example in the KeyGroupHashPartitioner. A bit more difficult is what to assume as the default number of key groups. We thought about assuming a default of `128`. That has no overhead in state backends like RocksDB and also allows initial job deployments which did not think about properly configuring this to have some freedom to scale out. If the parallelism is >= 128, this should probably round to the next highest power-of-two. There are some log statements which cause log flooding, like an INFO log statement for every checkpoint stream (factory) created.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter closed the pull request at:

          https://github.com/apache/flink/pull/2376

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/2376
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/2440

          Keyed backend refactor

          This pull request is a followup to the preliminary pull request #2376 and addresses all issues subsumed under FLINK-3755

          In addition to the changes from PR #2376, this PR adds:

          1. 1) Refactoring of Key Value State

          Before, `AbstractStateBackend` was responsible both for checkpointing to streams and
          for keyed state. Now, this functionality is split up into `CheckpointStreamFactory` and
          `KeyedStateBackend`. The former is responsible for providing streams for writing checkpoints
          while the latter is only responsible for keeping keyed state. A `KeyedStateBackend` can
          write its content to a checkpoint. For this it uses a `CheckpointStreamFactory`.

          1. 2) Introduction of key-group aware `KeyedStateBackend`s
            1. a) HeapKeyedStateBackend
              `HeapKeyedStateBackend` subsumes the keyed state part of both `MemStateBackend` and
              `FsStateBackend` and `MemoryStateBackend`. The only difference between the two now
              is that one produces a `CheckpointStreamFactory` that produces streams for writing to files
              while the other provides streams that write to in-memory byte arrays.

          Also, this introduces another layer of lookup in the `HeapKeyedStateBackend` to accomodate
          storing state per key group. Upon checkpointing the data is written out in a format that
          is very similar to the new RocksDB backend. We should make these 100 % compatible as
          a follow up.

            1. b) RocksDBKeyedStateBackend

          The RocksDB state backend is now also key-group aware. This happens through an additional
          1-2 byte key-group prefix that is added to each key. On snapshots, the key value states
          for different key-groups are combined through `RocksDBMergeIterator`. All snapshots from
          this backend are now running fully asynchronous using an implimentation of
          `AbstractAsyncIOCallable`.

          1. Refactoring of asynchrounous snapshot facilities

          Snapshots are now driven by `AsyncCheckpointRunnable`s in `StreamTask`, which are executed
          through a threadpool. `AsyncCheckpointRunnable` is created with a
          `RunnableFuture<KeyGroupsStateHandle>` that is obtained from `KeyedStateBackend` through

          ```
          public abstract RunnableFuture<KeyGroupsStateHandle> snapshot(
          long checkpointId,
          long timestamp,
          CheckpointStreamFactory streamFactory) throws Exception;
          ```

          1. Review comments on #2376

          From the comments on this PR, which can be found under PR #2376, we introduced the following changes:

            1. In comparison to PR #2376, we dropped the KeyGroupAssigner interface in favor of static methods.
              The reason for this is that the code relies on a consistent key group assignment in several places.
            2. By default, the max parallelism is chosen as
              ```
              Math.min(128 , roundToNextPowerOfTwo(parallelism + parallelism / 2))
              ```
            3. No blocking on the termination of async snapshot threads.
            4. Reduced logging.
          1. Limitiations

          Currently, queryable state is not key-group aware and `QueryableStateITCase` is ignored. This will
          be solved in a folloup work.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink keyed-backend-refactor

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2440.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2440


          commit 40484f3a66558b40bcf5bcaae3e3dba28d73f8dd
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2016-07-28T13:08:24Z

          FLINK-4380 Introduce KeyGroupAssigner and Max-Parallelism Parameter

          This introduces a new KeySelector that assigns keys to key groups and
          also adds the max parallelism parameter throughout all API levels.

          This also adds tests for the newly introduced features.

          commit 3609f29076dd504ab9790d874fe3e3d4828f6b77
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-08-10T16:44:50Z

          FLINK-3761 Refactor State Backends/Make Keyed State Key-Group Aware

          The biggest change in this is that functionality that used to be in
          AbstractStateBackend is now moved to CheckpointStreamFactory and
          KeyedStateBackend. The former is responsible for providing streams that
          can be used to checkpoint data while the latter is responsible for
          keeping keyed state. A keyed backend can checkpoint the state that it
          keeps by using a CheckpointStreamFactory.

          This also refactors how asynchronous keyed state snapshots work. They
          are not implemented using a Future/RunnableFuture.

          Also, this changes the keyed state backends to be key-group aware and to
          snapshot the state in key-groups with an index for restoring.

          commit 9d675ca0707b31923108ea78908b63fc46798c97
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2016-08-11T09:59:07Z

          FLINK-4381 Refactor State to Prepare For Key-Group State Backends

          commit d99b75c70bb15fe6ee5c06968d92b075e9b6c772
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2016-08-11T10:14:18Z

          FLINK-4380 Add tests for new Key-Group/Max-Parallelism

          This tests the rescaling features in CheckpointCoordinator and
          SavepointCoordinator.

          commit 1d514b8d5a8db663e1be293b9653c42d45787e36
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2016-08-17T12:50:18Z

          FLINK-3761 Refactor RocksDB Backend/Make Key-Group Aware

          This change makes the RocksDB backend key-group aware by building on the
          changes in the previous commit.

          commit 4f791d17f727a2fead12224652747b73d98ffaa1
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2016-08-25T12:09:12Z

          Ignore QueryableStateITCase

          This doesn't work yet because the state query machinery is not yet
          properly aware of key-grouped state.

          commit f47af43bd7b5e54086ab2ce87b9471cb99a38421
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2016-08-29T08:02:31Z

          Introduced timout of thread pool for testing. Removed legacy code path from HashKeyGroupAssigner

          commit 52061346e39cbc591e79bad2b6a4d9ce272bb558
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2016-08-29T09:53:22Z

          Stephan's feedback: remove KeyGroupAssigner in favor of a static method and have default max. parallelism at 128

          commit d77475a69dfdfae092107562af5c96af8de370cb
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2016-08-29T14:10:15Z

          Improved test stability

          commit 1252a246bee6bc181b2def83966d121b1ca5688e
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2016-08-30T10:26:48Z

          Improved HeapKeyedStateBackend for more compact snapshots.

          commit 3f282619c6dbf48a246bf848be2e92a4cad8bd2d
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2016-08-30T11:45:40Z

          Expose max parallelism through StreamExecutionEnvironment

          commit 1fa0e02f33722029159b39625127758fcb3623d3
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2016-08-30T12:47:34Z

          test fix

          commit 257992bf3ead38d12775b077478b84f8690c7fb9
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2016-08-30T12:59:12Z

          Extended EventTimeWindowCheckpointITCase to test the boundaries of maxParallelism.

          commit ea26c0f2e9f1687b5ae89a1acaeaea681c19bd80
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2016-08-30T13:27:25Z

          Reset WindowCheckpointingITCase to (sometimes) failing version.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2440 Keyed backend refactor This pull request is a followup to the preliminary pull request #2376 and addresses all issues subsumed under FLINK-3755 In addition to the changes from PR #2376, this PR adds: 1) Refactoring of Key Value State Before, `AbstractStateBackend` was responsible both for checkpointing to streams and for keyed state. Now, this functionality is split up into `CheckpointStreamFactory` and `KeyedStateBackend`. The former is responsible for providing streams for writing checkpoints while the latter is only responsible for keeping keyed state. A `KeyedStateBackend` can write its content to a checkpoint. For this it uses a `CheckpointStreamFactory`. 2) Introduction of key-group aware `KeyedStateBackend`s a) HeapKeyedStateBackend `HeapKeyedStateBackend` subsumes the keyed state part of both `MemStateBackend` and `FsStateBackend` and `MemoryStateBackend`. The only difference between the two now is that one produces a `CheckpointStreamFactory` that produces streams for writing to files while the other provides streams that write to in-memory byte arrays. Also, this introduces another layer of lookup in the `HeapKeyedStateBackend` to accomodate storing state per key group. Upon checkpointing the data is written out in a format that is very similar to the new RocksDB backend. We should make these 100 % compatible as a follow up. b) RocksDBKeyedStateBackend The RocksDB state backend is now also key-group aware. This happens through an additional 1-2 byte key-group prefix that is added to each key. On snapshots, the key value states for different key-groups are combined through `RocksDBMergeIterator`. All snapshots from this backend are now running fully asynchronous using an implimentation of `AbstractAsyncIOCallable`. Refactoring of asynchrounous snapshot facilities Snapshots are now driven by `AsyncCheckpointRunnable`s in `StreamTask`, which are executed through a threadpool. `AsyncCheckpointRunnable` is created with a `RunnableFuture<KeyGroupsStateHandle>` that is obtained from `KeyedStateBackend` through ``` public abstract RunnableFuture<KeyGroupsStateHandle> snapshot( long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception; ``` Review comments on #2376 From the comments on this PR, which can be found under PR #2376, we introduced the following changes: In comparison to PR #2376, we dropped the KeyGroupAssigner interface in favor of static methods. The reason for this is that the code relies on a consistent key group assignment in several places. By default, the max parallelism is chosen as ``` Math.min(128 , roundToNextPowerOfTwo(parallelism + parallelism / 2)) ``` No blocking on the termination of async snapshot threads. Reduced logging. Limitiations Currently, queryable state is not key-group aware and `QueryableStateITCase` is ignored. This will be solved in a folloup work. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink keyed-backend-refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2440.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2440 commit 40484f3a66558b40bcf5bcaae3e3dba28d73f8dd Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-07-28T13:08:24Z FLINK-4380 Introduce KeyGroupAssigner and Max-Parallelism Parameter This introduces a new KeySelector that assigns keys to key groups and also adds the max parallelism parameter throughout all API levels. This also adds tests for the newly introduced features. commit 3609f29076dd504ab9790d874fe3e3d4828f6b77 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-08-10T16:44:50Z FLINK-3761 Refactor State Backends/Make Keyed State Key-Group Aware The biggest change in this is that functionality that used to be in AbstractStateBackend is now moved to CheckpointStreamFactory and KeyedStateBackend. The former is responsible for providing streams that can be used to checkpoint data while the latter is responsible for keeping keyed state. A keyed backend can checkpoint the state that it keeps by using a CheckpointStreamFactory. This also refactors how asynchronous keyed state snapshots work. They are not implemented using a Future/RunnableFuture. Also, this changes the keyed state backends to be key-group aware and to snapshot the state in key-groups with an index for restoring. commit 9d675ca0707b31923108ea78908b63fc46798c97 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2016-08-11T09:59:07Z FLINK-4381 Refactor State to Prepare For Key-Group State Backends commit d99b75c70bb15fe6ee5c06968d92b075e9b6c772 Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-08-11T10:14:18Z FLINK-4380 Add tests for new Key-Group/Max-Parallelism This tests the rescaling features in CheckpointCoordinator and SavepointCoordinator. commit 1d514b8d5a8db663e1be293b9653c42d45787e36 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2016-08-17T12:50:18Z FLINK-3761 Refactor RocksDB Backend/Make Key-Group Aware This change makes the RocksDB backend key-group aware by building on the changes in the previous commit. commit 4f791d17f727a2fead12224652747b73d98ffaa1 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2016-08-25T12:09:12Z Ignore QueryableStateITCase This doesn't work yet because the state query machinery is not yet properly aware of key-grouped state. commit f47af43bd7b5e54086ab2ce87b9471cb99a38421 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2016-08-29T08:02:31Z Introduced timout of thread pool for testing. Removed legacy code path from HashKeyGroupAssigner commit 52061346e39cbc591e79bad2b6a4d9ce272bb558 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2016-08-29T09:53:22Z Stephan's feedback: remove KeyGroupAssigner in favor of a static method and have default max. parallelism at 128 commit d77475a69dfdfae092107562af5c96af8de370cb Author: Stefan Richter <s.richter@data-artisans.com> Date: 2016-08-29T14:10:15Z Improved test stability commit 1252a246bee6bc181b2def83966d121b1ca5688e Author: Stefan Richter <s.richter@data-artisans.com> Date: 2016-08-30T10:26:48Z Improved HeapKeyedStateBackend for more compact snapshots. commit 3f282619c6dbf48a246bf848be2e92a4cad8bd2d Author: Stefan Richter <s.richter@data-artisans.com> Date: 2016-08-30T11:45:40Z Expose max parallelism through StreamExecutionEnvironment commit 1fa0e02f33722029159b39625127758fcb3623d3 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2016-08-30T12:47:34Z test fix commit 257992bf3ead38d12775b077478b84f8690c7fb9 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2016-08-30T12:59:12Z Extended EventTimeWindowCheckpointITCase to test the boundaries of maxParallelism. commit ea26c0f2e9f1687b5ae89a1acaeaea681c19bd80 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2016-08-30T13:27:25Z Reset WindowCheckpointingITCase to (sometimes) failing version.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter closed the pull request at:

          https://github.com/apache/flink/pull/2440

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/2440
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2440#discussion_r77856665

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state;
          +
          +import org.apache.flink.util.MathUtils;
          +import org.apache.flink.util.Preconditions;
          +
          +public final class KeyGroupRangeAssignment {
          +
          + public static final int DEFAULT_MAX_PARALLELISM = 128;
          +
          + private KeyGroupRangeAssignment()

          { + throw new AssertionError(); + }

          +
          + /**
          + * Assigns the given key to a parallel operator index.
          + *
          + * @param key the key to assign
          + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
          + * @param parallelism the current parallelism of the operator
          + * @return the index of the parallel operator to which the given key should be routed.
          + */
          + public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism)

          { + return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); + }

          +
          + /**
          + * Assigns the given key to a key-group index.
          + *
          + * @param key the key to assign
          + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
          + * @return the key-group to which the given key is assigned
          + */
          + public static final int assignToKeyGroup(Object key, int maxParallelism)

          { + return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + }

          +
          + /**
          + * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
          + * parallelism.
          + *
          + * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
          + * to go beyond this boundary, this method must perform arithmetic on long values.
          + *
          + * @param maxParallelism Maximal parallelism that the job was initially created with.
          + * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism.
          + * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism.
          + * @return
          + */
          + public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
          + int maxParallelism,
          + int parallelism,
          + int operatorIndex) {
          + Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
          + Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
          + Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15.");
          +
          + int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
          + int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
          — End diff –

          Can't we simplify this expression to
          ```
          int start = (operatorIndex * maxParallelism) / parallelism;
          int end = ((operatorIndex + 1) * maxParallelism) / parallelism - 1;
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2440#discussion_r77856665 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; + +public final class KeyGroupRangeAssignment { + + public static final int DEFAULT_MAX_PARALLELISM = 128; + + private KeyGroupRangeAssignment() { + throw new AssertionError(); + } + + /** + * Assigns the given key to a parallel operator index. + * + * @param key the key to assign + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. + * @param parallelism the current parallelism of the operator + * @return the index of the parallel operator to which the given key should be routed. + */ + public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { + return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); + } + + /** + * Assigns the given key to a key-group index. + * + * @param key the key to assign + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. + * @return the key-group to which the given key is assigned + */ + public static final int assignToKeyGroup(Object key, int maxParallelism) { + return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + } + + /** + * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum + * parallelism. + * + * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want + * to go beyond this boundary, this method must perform arithmetic on long values. + * + * @param maxParallelism Maximal parallelism that the job was initially created with. + * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism. + * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism. + * @return + */ + public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( + int maxParallelism, + int parallelism, + int operatorIndex) { + Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero."); + Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism."); + Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15."); + + int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1; + int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; — End diff – Can't we simplify this expression to ``` int start = (operatorIndex * maxParallelism) / parallelism; int end = ((operatorIndex + 1) * maxParallelism) / parallelism - 1; ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/2440

          I don't think that this is giving us the correct inverse for ```computeOperatorIndexForKeyGroup(...)```. Our test ```CheckpointCoordinatorTest::testCreateKeyGroupPartitions()```generates counter-examples.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/2440 I don't think that this is giving us the correct inverse for ```computeOperatorIndexForKeyGroup(...)```. Our test ```CheckpointCoordinatorTest::testCreateKeyGroupPartitions()```generates counter-examples.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2440#discussion_r77870173

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state;
          +
          +import org.apache.flink.util.MathUtils;
          +import org.apache.flink.util.Preconditions;
          +
          +public final class KeyGroupRangeAssignment {
          +
          + public static final int DEFAULT_MAX_PARALLELISM = 128;
          +
          + private KeyGroupRangeAssignment()

          { + throw new AssertionError(); + }

          +
          + /**
          + * Assigns the given key to a parallel operator index.
          + *
          + * @param key the key to assign
          + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
          + * @param parallelism the current parallelism of the operator
          + * @return the index of the parallel operator to which the given key should be routed.
          + */
          + public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism)

          { + return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); + }

          +
          + /**
          + * Assigns the given key to a key-group index.
          + *
          + * @param key the key to assign
          + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
          + * @return the key-group to which the given key is assigned
          + */
          + public static final int assignToKeyGroup(Object key, int maxParallelism)

          { + return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + }

          +
          + /**
          + * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
          + * parallelism.
          + *
          + * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
          + * to go beyond this boundary, this method must perform arithmetic on long values.
          + *
          + * @param maxParallelism Maximal parallelism that the job was initially created with.
          + * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism.
          + * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism.
          + * @return
          + */
          + public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
          + int maxParallelism,
          + int parallelism,
          + int operatorIndex) {
          + Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
          + Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
          + Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15.");
          +
          + int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
          + int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
          — End diff –

          I don't think that this is giving us the correct inverse for computeOperatorIndexForKeyGroup(...). Our test CheckpointCoordinatorTest::testCreateKeyGroupPartitions()generates counter-examples.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/2440#discussion_r77870173 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; + +public final class KeyGroupRangeAssignment { + + public static final int DEFAULT_MAX_PARALLELISM = 128; + + private KeyGroupRangeAssignment() { + throw new AssertionError(); + } + + /** + * Assigns the given key to a parallel operator index. + * + * @param key the key to assign + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. + * @param parallelism the current parallelism of the operator + * @return the index of the parallel operator to which the given key should be routed. + */ + public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { + return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); + } + + /** + * Assigns the given key to a key-group index. + * + * @param key the key to assign + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. + * @return the key-group to which the given key is assigned + */ + public static final int assignToKeyGroup(Object key, int maxParallelism) { + return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + } + + /** + * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum + * parallelism. + * + * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want + * to go beyond this boundary, this method must perform arithmetic on long values. + * + * @param maxParallelism Maximal parallelism that the job was initially created with. + * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism. + * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism. + * @return + */ + public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( + int maxParallelism, + int parallelism, + int operatorIndex) { + Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero."); + Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism."); + Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15."); + + int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1; + int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; — End diff – I don't think that this is giving us the correct inverse for computeOperatorIndexForKeyGroup(...). Our test CheckpointCoordinatorTest::testCreateKeyGroupPartitions()generates counter-examples.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2440#discussion_r77956781

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java —
          @@ -0,0 +1,97 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.state;
          +
          +import org.apache.flink.util.MathUtils;
          +import org.apache.flink.util.Preconditions;
          +
          +public final class KeyGroupRangeAssignment {
          +
          + public static final int DEFAULT_MAX_PARALLELISM = 128;
          +
          + private KeyGroupRangeAssignment()

          { + throw new AssertionError(); + }

          +
          + /**
          + * Assigns the given key to a parallel operator index.
          + *
          + * @param key the key to assign
          + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
          + * @param parallelism the current parallelism of the operator
          + * @return the index of the parallel operator to which the given key should be routed.
          + */
          + public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism)

          { + return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); + }

          +
          + /**
          + * Assigns the given key to a key-group index.
          + *
          + * @param key the key to assign
          + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
          + * @return the key-group to which the given key is assigned
          + */
          + public static final int assignToKeyGroup(Object key, int maxParallelism)

          { + return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + }

          +
          + /**
          + * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
          + * parallelism.
          + *
          + * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
          + * to go beyond this boundary, this method must perform arithmetic on long values.
          + *
          + * @param maxParallelism Maximal parallelism that the job was initially created with.
          + * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism.
          + * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism.
          + * @return
          + */
          + public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
          + int maxParallelism,
          + int parallelism,
          + int operatorIndex) {
          + Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
          + Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
          + Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15.");
          +
          + int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
          + int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
          — End diff –

          Ah I see. Now it makes sense

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2440#discussion_r77956781 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java — @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Preconditions; + +public final class KeyGroupRangeAssignment { + + public static final int DEFAULT_MAX_PARALLELISM = 128; + + private KeyGroupRangeAssignment() { + throw new AssertionError(); + } + + /** + * Assigns the given key to a parallel operator index. + * + * @param key the key to assign + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. + * @param parallelism the current parallelism of the operator + * @return the index of the parallel operator to which the given key should be routed. + */ + public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { + return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); + } + + /** + * Assigns the given key to a key-group index. + * + * @param key the key to assign + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. + * @return the key-group to which the given key is assigned + */ + public static final int assignToKeyGroup(Object key, int maxParallelism) { + return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + } + + /** + * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum + * parallelism. + * + * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want + * to go beyond this boundary, this method must perform arithmetic on long values. + * + * @param maxParallelism Maximal parallelism that the job was initially created with. + * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism. + * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism. + * @return + */ + public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( + int maxParallelism, + int parallelism, + int operatorIndex) { + Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero."); + Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism."); + Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15."); + + int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1; + int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism; — End diff – Ah I see. Now it makes sense

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              1 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development