Kafka
  1. Kafka
  2. KAFKA-687

Rebalance algorithm should consider partitions from all topics

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.9.0
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      The current rebalance step, as stated in the original Kafka paper [1], splits the partitions per topic between all the consumers. So if you have 100 topics with 2 partitions each and 10 consumers only two consumers will be used. That is, for each topic all partitions will be listed and shared between the consumers in the consumer group in order (not randomly).

      If the consumer group is reading from several topics at the same time it makes sense to split all the partitions from all topics between all the consumer. Following the example, we will have 200 partitions in total, 20 per consumer, using the 10 consumers.

      The load per topic could be different and the division should consider this. However even a random division should be better than the current algorithm while reading from several topics and should harm reading from a few topics with several partitions.

      1. KAFKA-687_2014-08-28_16:20:25.patch
        36 kB
        Joel Koshy
      2. KAFKA-687_2014-08-25_12:36:48.patch
        42 kB
        Joel Koshy
      3. KAFKA-687_2014-08-20_18:09:28.patch
        45 kB
        Joel Koshy
      4. KAFKA-687_2014-08-19_12:07:37.patch
        40 kB
        Joel Koshy
      5. KAFKA-687_2014-07-18_15:55:15.patch
        228 kB
        Joel Koshy
      6. KAFKA-687.patch
        38 kB
        Joel Koshy

        Issue Links

          Activity

          Hide
          Jay Kreps added a comment -

          This is a very good point, and not one I had considered.

          It is probably not a trivial change because right now I think the election is done for each topic independently.

          We have in mind in the next major release after 0.8 (0.9, presumably) to move this co-ordination to the server, which would be a good time to fix this. We could either do this balancing exactly or else just randomize the start index (which would be almost as good if you had many topics.

          Show
          Jay Kreps added a comment - This is a very good point, and not one I had considered. It is probably not a trivial change because right now I think the election is done for each topic independently. We have in mind in the next major release after 0.8 (0.9, presumably) to move this co-ordination to the server, which would be a good time to fix this. We could either do this balancing exactly or else just randomize the start index (which would be almost as good if you had many topics.
          Hide
          Joel Koshy added a comment -

          Although we are working on the new consumer, some people were interested
          (offline) in getting some form of this done in the current consumer at least
          for wildcard consumption. It should (in theory) be simple to do, but may
          take a couple days because the wildcard consumption part of the code is
          convoluted - mainly because when it was done we did not want to modify the
          existing consumer too much.

          Anyway, I dumped some thoughts in the comments of this gist:
          https://gist.github.com/jjkoshy/5c3d065161153b7b1ee3 and the unit test at
          the end provides one possible partition layout strategy.

          Show
          Joel Koshy added a comment - Although we are working on the new consumer, some people were interested (offline) in getting some form of this done in the current consumer at least for wildcard consumption. It should (in theory) be simple to do, but may take a couple days because the wildcard consumption part of the code is convoluted - mainly because when it was done we did not want to modify the existing consumer too much. Anyway, I dumped some thoughts in the comments of this gist: https://gist.github.com/jjkoshy/5c3d065161153b7b1ee3 and the unit test at the end provides one possible partition layout strategy.
          Hide
          Joel Koshy added a comment -

          Created reviewboard https://reviews.apache.org/r/23655/diff/
          against branch origin/trunk

          Show
          Joel Koshy added a comment - Created reviewboard https://reviews.apache.org/r/23655/diff/ against branch origin/trunk
          Hide
          Joel Koshy added a comment -

          I ended up abandoning the earlier approach I was thinking of in the above gist and went with I think a simpler approach. The layout algorithms are the result of discussions with Clark Haskins

          Show
          Joel Koshy added a comment - I ended up abandoning the earlier approach I was thinking of in the above gist and went with I think a simpler approach. The layout algorithms are the result of discussions with Clark Haskins
          Hide
          Joel Koshy added a comment -

          Updated reviewboard https://reviews.apache.org/r/23655/diff/
          against branch origin/trunk

          Show
          Joel Koshy added a comment - Updated reviewboard https://reviews.apache.org/r/23655/diff/ against branch origin/trunk
          Hide
          Joel Koshy added a comment -

          Short update on this:

          After the initial review comments, I was trying to make the allocation module more generic so we can reuse it in the new consumer. Furthermore, I was trying to get rid of the "symmetric" mode (which is for wildcards only and with identical subscriptions across all consumers) and make "roundrobin" more general. The basic approach was to sort the consumer IDs based on a hash of the consumerID with the topic appended to it - effectively scrambling (in a consistent order) the list of consumer streams available for a given topic - and then doing a round-robin assignment across available partitions of the topic. This did not actually work as well as expected. Here is the output of some simulations:

              [2014-07-25 17:00:35,559] INFO Owned count summary for 6284 partitions across 63 consumer ids (9 consumers with 7 streams): min: 8.000000; max: 200.000000; avg: 99.746032; stddev: 58.871914; ideal: 99.746033 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:00:36,791] INFO Owned count summary for 6118 partitions across 42 consumer ids (7 consumers with 6 streams): min: 57.000000; max: 254.000000; avg: 145.666667; stddev: 60.954468; ideal: 145.666672 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:00:38,065] INFO Owned count summary for 10652 partitions across 88 consumer ids (11 consumers with 8 streams): min: 4.000000; max: 335.000000; avg: 169.079365; stddev: 101.093266; ideal: 121.045456 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:02:07,198] INFO Owned count summary for 10839 partitions across 200 consumer ids (20 consumers with 10 streams): min: 3.000000; max: 330.000000; avg: 172.047619; stddev: 99.267223; ideal: 54.195000 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:24:35,676] INFO Owned count summary for 6439 partitions across 12 consumer ids (2 consumers with 6 streams): min: 445.000000; max: 626.000000; avg: 536.583333; stddev: 58.445714; ideal: 536.583313 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:24:36,787] INFO Owned count summary for 11777 partitions across 63 consumer ids (7 consumers with 9 streams): min: 5.000000; max: 369.000000; avg: 186.936508; stddev: 113.972531; ideal: 186.936508 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:25:20,108] INFO Owned count summary for 10488 partitions across 144 consumer ids (18 consumers with 8 streams): min: 8.000000; max: 335.000000; avg: 166.476190; stddev: 101.988433; ideal: 72.833336 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:33:52,532] INFO Owned count summary for 5783 partitions across 25 consumer ids (5 consumers with 5 streams): min: 141.000000; max: 336.000000; avg: 231.320000; stddev: 69.337171; ideal: 231.320007 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:33:53,268] INFO Owned count summary for 6181 partitions across 7 consumer ids (7 consumers with 1 streams): min: 801.000000; max: 980.000000; avg: 883.000000; stddev: 59.654561; ideal: 883.000000 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:33:56,124] INFO Owned count summary for 6475 partitions across 32 consumer ids (4 consumers with 8 streams): min: 105.000000; max: 299.000000; avg: 202.343750; stddev: 62.999544; ideal: 202.343750 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:35:10,370] INFO Owned count summary for 7739 partitions across 162 consumer ids (18 consumers with 9 streams): min: 6.000000; max: 239.000000; avg: 122.841270; stddev: 69.379788; ideal: 47.771606 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:35:11,834] INFO Owned count summary for 9070 partitions across 14 consumer ids (2 consumers with 7 streams): min: 520.000000; max: 774.000000; avg: 647.857143; stddev: 84.860843; ideal: 647.857117 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:36:37,935] INFO Owned count summary for 10933 partitions across 85 consumer ids (17 consumers with 5 streams): min: 5.000000; max: 350.000000; avg: 173.539683; stddev: 105.619192; ideal: 128.623535 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:36:40,641] INFO Owned count summary for 8665 partitions across 64 consumer ids (8 consumers with 8 streams): min: 4.000000; max: 267.000000; avg: 137.539683; stddev: 82.121434; ideal: 135.390625 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:36:42,612] INFO Owned count summary for 8432 partitions across 48 consumer ids (6 consumers with 8 streams): min: 68.000000; max: 328.000000; avg: 175.666667; stddev: 78.829828; ideal: 175.666672 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:36:44,010] INFO Owned count summary for 6899 partitions across 110 consumer ids (11 consumers with 10 streams): min: 2.000000; max: 204.000000; avg: 109.507937; stddev: 61.068146; ideal: 62.718182 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:37:57,128] INFO Owned count summary for 8622 partitions across 48 consumer ids (6 consumers with 8 streams): min: 61.000000; max: 324.000000; avg: 179.625000; stddev: 76.374114; ideal: 179.625000 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:37:58,873] INFO Owned count summary for 6288 partitions across 84 consumer ids (12 consumers with 7 streams): min: 1.000000; max: 200.000000; avg: 99.809524; stddev: 62.132236; ideal: 74.857140 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:37:59,913] INFO Owned count summary for 6467 partitions across 91 consumer ids (13 consumers with 7 streams): min: 5.000000; max: 200.000000; avg: 102.650794; stddev: 59.990096; ideal: 71.065933 (unit.kafka.consumer.PartitionAllocatorTest:68)
              [2014-07-25 17:38:01,621] INFO Owned count summary for 6311 partitions across 8 consumer ids (2 consumers with 4 streams): min: 716.000000; max: 869.000000; avg: 788.875000; stddev: 53.799728; ideal: 788.875000 (unit.kafka.consumer.PartitionAllocatorTest:68)
          

          Will think more about it and reconsider keeping the specialized case (which is actually the common case - i.e., consume with wildcards and identical subscriptions).

          Show
          Joel Koshy added a comment - Short update on this: After the initial review comments, I was trying to make the allocation module more generic so we can reuse it in the new consumer. Furthermore, I was trying to get rid of the "symmetric" mode (which is for wildcards only and with identical subscriptions across all consumers) and make "roundrobin" more general. The basic approach was to sort the consumer IDs based on a hash of the consumerID with the topic appended to it - effectively scrambling (in a consistent order) the list of consumer streams available for a given topic - and then doing a round-robin assignment across available partitions of the topic. This did not actually work as well as expected. Here is the output of some simulations: [2014-07-25 17:00:35,559] INFO Owned count summary for 6284 partitions across 63 consumer ids (9 consumers with 7 streams): min: 8.000000; max: 200.000000; avg: 99.746032; stddev: 58.871914; ideal: 99.746033 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:00:36,791] INFO Owned count summary for 6118 partitions across 42 consumer ids (7 consumers with 6 streams): min: 57.000000; max: 254.000000; avg: 145.666667; stddev: 60.954468; ideal: 145.666672 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:00:38,065] INFO Owned count summary for 10652 partitions across 88 consumer ids (11 consumers with 8 streams): min: 4.000000; max: 335.000000; avg: 169.079365; stddev: 101.093266; ideal: 121.045456 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:02:07,198] INFO Owned count summary for 10839 partitions across 200 consumer ids (20 consumers with 10 streams): min: 3.000000; max: 330.000000; avg: 172.047619; stddev: 99.267223; ideal: 54.195000 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:24:35,676] INFO Owned count summary for 6439 partitions across 12 consumer ids (2 consumers with 6 streams): min: 445.000000; max: 626.000000; avg: 536.583333; stddev: 58.445714; ideal: 536.583313 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:24:36,787] INFO Owned count summary for 11777 partitions across 63 consumer ids (7 consumers with 9 streams): min: 5.000000; max: 369.000000; avg: 186.936508; stddev: 113.972531; ideal: 186.936508 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:25:20,108] INFO Owned count summary for 10488 partitions across 144 consumer ids (18 consumers with 8 streams): min: 8.000000; max: 335.000000; avg: 166.476190; stddev: 101.988433; ideal: 72.833336 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:33:52,532] INFO Owned count summary for 5783 partitions across 25 consumer ids (5 consumers with 5 streams): min: 141.000000; max: 336.000000; avg: 231.320000; stddev: 69.337171; ideal: 231.320007 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:33:53,268] INFO Owned count summary for 6181 partitions across 7 consumer ids (7 consumers with 1 streams): min: 801.000000; max: 980.000000; avg: 883.000000; stddev: 59.654561; ideal: 883.000000 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:33:56,124] INFO Owned count summary for 6475 partitions across 32 consumer ids (4 consumers with 8 streams): min: 105.000000; max: 299.000000; avg: 202.343750; stddev: 62.999544; ideal: 202.343750 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:35:10,370] INFO Owned count summary for 7739 partitions across 162 consumer ids (18 consumers with 9 streams): min: 6.000000; max: 239.000000; avg: 122.841270; stddev: 69.379788; ideal: 47.771606 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:35:11,834] INFO Owned count summary for 9070 partitions across 14 consumer ids (2 consumers with 7 streams): min: 520.000000; max: 774.000000; avg: 647.857143; stddev: 84.860843; ideal: 647.857117 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:36:37,935] INFO Owned count summary for 10933 partitions across 85 consumer ids (17 consumers with 5 streams): min: 5.000000; max: 350.000000; avg: 173.539683; stddev: 105.619192; ideal: 128.623535 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:36:40,641] INFO Owned count summary for 8665 partitions across 64 consumer ids (8 consumers with 8 streams): min: 4.000000; max: 267.000000; avg: 137.539683; stddev: 82.121434; ideal: 135.390625 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:36:42,612] INFO Owned count summary for 8432 partitions across 48 consumer ids (6 consumers with 8 streams): min: 68.000000; max: 328.000000; avg: 175.666667; stddev: 78.829828; ideal: 175.666672 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:36:44,010] INFO Owned count summary for 6899 partitions across 110 consumer ids (11 consumers with 10 streams): min: 2.000000; max: 204.000000; avg: 109.507937; stddev: 61.068146; ideal: 62.718182 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:37:57,128] INFO Owned count summary for 8622 partitions across 48 consumer ids (6 consumers with 8 streams): min: 61.000000; max: 324.000000; avg: 179.625000; stddev: 76.374114; ideal: 179.625000 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:37:58,873] INFO Owned count summary for 6288 partitions across 84 consumer ids (12 consumers with 7 streams): min: 1.000000; max: 200.000000; avg: 99.809524; stddev: 62.132236; ideal: 74.857140 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:37:59,913] INFO Owned count summary for 6467 partitions across 91 consumer ids (13 consumers with 7 streams): min: 5.000000; max: 200.000000; avg: 102.650794; stddev: 59.990096; ideal: 71.065933 (unit.kafka.consumer.PartitionAllocatorTest:68) [2014-07-25 17:38:01,621] INFO Owned count summary for 6311 partitions across 8 consumer ids (2 consumers with 4 streams): min: 716.000000; max: 869.000000; avg: 788.875000; stddev: 53.799728; ideal: 788.875000 (unit.kafka.consumer.PartitionAllocatorTest:68) Will think more about it and reconsider keeping the specialized case (which is actually the common case - i.e., consume with wildcards and identical subscriptions).
          Hide
          Joel Koshy added a comment -

          Jun Rao I was thinking over this a little more and I felt it is better not to design the new consumer's partition allocator API in this jira. There are a couple of reasons:

          • The new consumer's allocator's interface requirements and desired implementations will be known precisely only when we get to it - i.e., when we are implementing the partition assignment in the new consumer. So we will most likely change it anyway when we implement the new consumer.
          • The allocation code is not very complicated anyway so I don't think it is a lot of work to rewrite it in the new consumer implementation.
          • With the "more general" API that we discussed, the range allocation can no longer an exact copy (unlike the original patch). I would prefer to avoid touching the range-partitioner in the existing consumer at this point since that is the default that most people use.

          So what I would propose is the following: keep the partition allocation interface as in the original patch and provide only one more allocation implementation: roundrobin. This allocation scheme is legal only when using wildcards on all consumer instances and all the regexes are identical (although stream counts can be different).

          Show
          Joel Koshy added a comment - Jun Rao I was thinking over this a little more and I felt it is better not to design the new consumer's partition allocator API in this jira. There are a couple of reasons: The new consumer's allocator's interface requirements and desired implementations will be known precisely only when we get to it - i.e., when we are implementing the partition assignment in the new consumer. So we will most likely change it anyway when we implement the new consumer. The allocation code is not very complicated anyway so I don't think it is a lot of work to rewrite it in the new consumer implementation. With the "more general" API that we discussed, the range allocation can no longer an exact copy (unlike the original patch). I would prefer to avoid touching the range-partitioner in the existing consumer at this point since that is the default that most people use. So what I would propose is the following: keep the partition allocation interface as in the original patch and provide only one more allocation implementation: roundrobin. This allocation scheme is legal only when using wildcards on all consumer instances and all the regexes are identical (although stream counts can be different).
          Hide
          Jun Rao added a comment -

          Yes, that sounds reasonable. We can try to minimize the changes in the old consumer.

          Show
          Jun Rao added a comment - Yes, that sounds reasonable. We can try to minimize the changes in the old consumer.
          Hide
          Joel Koshy added a comment -

          Updated reviewboard https://reviews.apache.org/r/23655/diff/
          against branch origin/trunk

          Show
          Joel Koshy added a comment - Updated reviewboard https://reviews.apache.org/r/23655/diff/ against branch origin/trunk
          Hide
          Joel Koshy added a comment -

          Updated reviewboard https://reviews.apache.org/r/23655/
          against branch origin/trunk

          Show
          Joel Koshy added a comment - Updated reviewboard https://reviews.apache.org/r/23655/ against branch origin/trunk
          Hide
          Joel Koshy added a comment -

          Updated reviewboard https://reviews.apache.org/r/23655/
          against branch origin/trunk

          Show
          Joel Koshy added a comment - Updated reviewboard https://reviews.apache.org/r/23655/ against branch origin/trunk
          Hide
          Joel Koshy added a comment -

          Updated reviewboard https://reviews.apache.org/r/23655/
          against branch origin/trunk

          Show
          Joel Koshy added a comment - Updated reviewboard https://reviews.apache.org/r/23655/ against branch origin/trunk
          Hide
          Joel Koshy added a comment -

          Addressed follow-up comments and committed to trunk

          Show
          Joel Koshy added a comment - Addressed follow-up comments and committed to trunk

            People

            • Assignee:
              Joel Koshy
              Reporter:
              Pablo Barrera
              Reviewer:
              Jun Rao
            • Votes:
              3 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development