Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-24639

Improve assignment of Kinesis shards to subtasks




      The default assigner of Kinesis shards to Flink subtasks simply takes the hashCode() of the StreamShardHandle (an integer), which is then interpreted modulo the number of subtasks. This basically does random-ish but deterministic assignment of shards to subtasks.

      However, this can lead to some subtasks getting several times the number of shards as others. To prevent those unlucky subtasks from being overloaded, the overall Flink cluster must be over-provisioned, so that each subtask has more headroom to handle any over-assignment of shards.

      We can do better here, at least if Kinesis is being used in a common way. Each record sent to a Kinesis stream has a particular hash key in the range [0, 2^128), which is used to determine which shard gets used; each shard has an assigned range of hash keys. By default Kinesis assigns each shard equal fractions of the hash-key space. And when you scale up or down using UpdateShardCount, it tries to maintain equal fractions to the extent possible. Also, a shard's hash key range is fixed at creation; it can only be replaced by new shards, which split it, or merge it.

      Given the above, one way to assign shards to subtasks is to do a linear mapping from hash-keys in range [0, 2^128) to subtask indices in [0, nSubtasks). For the 'coordinate' of each shard we pick the middle of the shard's range, to ensure neither subtask 0 nor subtask (n-1) is assigned too many.

      However this will probably not be helpful for Kinesis users that don't randomly assign partition or hash keys to Kinesis records. The existing assigner is probably better for them.

      I ran a simulation of the default shard assigner versus some alternatives, using shards taken from one of our Kinesis streams; results attached. The measure I used I call 'overload' and it measures how many times more shards the most heavily-loaded subtask has than is necessary. (DEFAULT is the default assigner, Sha256 is similar to the default but with a stronger hashing function, ShardId extracts the shard number from the shardId and uses that, and HashKey is the one I describe above.)

      Patch is at: https://github.com/apache/flink/compare/master...john-karp:uniform-shard-assigner?expand=1


        Issue Links



              jkarp John Karp
              jkarp John Karp
              0 Vote for this issue
              3 Start watching this issue