Kafka
  1. Kafka
  2. KAFKA-347

change number of partitions of a topic online

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.1
    • Component/s: core
    • Labels:

      Description

      We will need an admin tool to change the number of partitions of a topic online.

      1. kafka-347.patch
        49 kB
        Sriram Subramanian
      2. kafka-347-v2.patch
        55 kB
        Sriram Subramanian
      3. KAFKA-347-v2-rebased.patch
        46 kB
        Sriram Subramanian
      4. KAFKA-347-v3.patch
        52 kB
        Sriram Subramanian
      5. KAFKA-347-v4.patch
        59 kB
        Sriram Subramanian
      6. KAFKA-347-v5.patch
        61 kB
        Sriram Subramanian

        Activity

        Jun Rao created issue -
        Joel Koshy made changes -
        Field Original Value New Value
        Labels optimization
        Neha Narkhede made changes -
        Labels optimization features
        Jun Rao made changes -
        Original Estimate 336h [ 1209600 ]
        Remaining Estimate 336h [ 1209600 ]
        Hide
        Jay Kreps added a comment -

        This is more of a nice-to-have, no? Can't we postpone this beyond 0.8? Changing partition # will scramble the partitioning from the producer, and we don't really have this capability now.

        Show
        Jay Kreps added a comment - This is more of a nice-to-have, no? Can't we postpone this beyond 0.8? Changing partition # will scramble the partitioning from the producer, and we don't really have this capability now.
        Hide
        Jun Rao added a comment -

        It can probably be pushed beyond 0.8. The only concern is that consumer parallelism is currently tied to # partitions.

        Show
        Jun Rao added a comment - It can probably be pushed beyond 0.8. The only concern is that consumer parallelism is currently tied to # partitions.
        Hide
        Maxime Brugidou added a comment -

        It's sort of out of scope but I think that using some sort of consistent hashing for partition assignment could really help here. The process could be that if you activate "automatic partition assignment", you don't need to send the ReassignPartition admin command to manage partitions but you over-partition and spread partitions for each topics over brokers using consistent hashing. This would:

        • Minimize the number of partitions moved when you add/remove brokers
        • Partitions would stick to a broker when some partitions are added/deleted
        • Could be run on a regular basis by brokers or other means, and would be perfectly consistent

        This would greatly improve operations

        Show
        Maxime Brugidou added a comment - It's sort of out of scope but I think that using some sort of consistent hashing for partition assignment could really help here. The process could be that if you activate "automatic partition assignment", you don't need to send the ReassignPartition admin command to manage partitions but you over-partition and spread partitions for each topics over brokers using consistent hashing. This would: Minimize the number of partitions moved when you add/remove brokers Partitions would stick to a broker when some partitions are added/deleted Could be run on a regular basis by brokers or other means, and would be perfectly consistent This would greatly improve operations
        Hide
        Jay Kreps added a comment -

        Well i think there are really two mappings here:
        key => partition
        and
        partition => broker

        This is the generalization of consistent hashing that most persistent data systems use.

        In Kafka key=>partition is user-defined (Partitioner interface) and defaults to just hash(key)%num_partitions. partition=>broker is assigned at topic creation time and from then on is semi-static (changing it is an admin command). So when adding a broker we already can move just the number of partitions we need by having the tool compute the best set of partitions to migrate or choosing at random.

        So the idea is that you over-partition and then the partition count doesn't change and hence the key=>partition assignment doesn't change.

        The question is, do we need to support changing the number of partitions to handle the case where you don't over-partition by enough? If you do this then the change in mapping would be large. That could be helped a bit by a consistent hash partitioner for the key=>partition mapping on the client side, but even in that case you would still have lots of values that are now in the wrong partition, so any code that depended on the partitioning would be broken.

        Alternately you could do the hard work of actually implementing partition splitting on the broker by having the broker split a partition into two and then migrating the new partitions.

        The question I would ask is, is any of this worth it? Many data system don't support partition splitting they just say "choose your partition count wisely or else delete it and start fresh". Arguably most messaging use cases are particularly concerned with recent data so this might be a fine answer. So an alternate strategy would just be to spend the time working on scaling the number of partitions we can handle and over-partitioning heavily.

        Show
        Jay Kreps added a comment - Well i think there are really two mappings here: key => partition and partition => broker This is the generalization of consistent hashing that most persistent data systems use. In Kafka key=>partition is user-defined (Partitioner interface) and defaults to just hash(key)%num_partitions. partition=>broker is assigned at topic creation time and from then on is semi-static (changing it is an admin command). So when adding a broker we already can move just the number of partitions we need by having the tool compute the best set of partitions to migrate or choosing at random. So the idea is that you over-partition and then the partition count doesn't change and hence the key=>partition assignment doesn't change. The question is, do we need to support changing the number of partitions to handle the case where you don't over-partition by enough? If you do this then the change in mapping would be large. That could be helped a bit by a consistent hash partitioner for the key=>partition mapping on the client side, but even in that case you would still have lots of values that are now in the wrong partition, so any code that depended on the partitioning would be broken. Alternately you could do the hard work of actually implementing partition splitting on the broker by having the broker split a partition into two and then migrating the new partitions. The question I would ask is, is any of this worth it? Many data system don't support partition splitting they just say "choose your partition count wisely or else delete it and start fresh". Arguably most messaging use cases are particularly concerned with recent data so this might be a fine answer. So an alternate strategy would just be to spend the time working on scaling the number of partitions we can handle and over-partitioning heavily.
        Hide
        Maxime Brugidou added a comment -

        Agreed. Except you also have to think about people not using keyed partitioning. It would be very convenient to add (maybe not remove) partitions as your topic gets larger and you increase your cluster size. And that could be done online since the number of partitions in this case is only used for scaling and distribution.

        (and I think the auto partition balancing is off-topic and I should probably open another JIRA)

        Show
        Maxime Brugidou added a comment - Agreed. Except you also have to think about people not using keyed partitioning. It would be very convenient to add (maybe not remove) partitions as your topic gets larger and you increase your cluster size. And that could be done online since the number of partitions in this case is only used for scaling and distribution. (and I think the auto partition balancing is off-topic and I should probably open another JIRA)
        Joe Stein made changes -
        Parent KAFKA-50 [ 12514687 ]
        Issue Type Sub-task [ 7 ] Improvement [ 4 ]
        Joe Stein made changes -
        Fix Version/s 0.8.1 [ 12322960 ]
        Sriram Subramanian made changes -
        Assignee Sriram Subramanian [ sriramsub ]
        Hide
        Sriram Subramanian added a comment -

        Going to look into this

        Show
        Sriram Subramanian added a comment - Going to look into this
        Hide
        Sriram Subramanian added a comment -

        1. Added an isolated change to dynamically add partitions for messages without key
        2. The only common code change is the replica layout strategy
        3. Unit tests

        Show
        Sriram Subramanian added a comment - 1. Added an isolated change to dynamically add partitions for messages without key 2. The only common code change is the replica layout strategy 3. Unit tests
        Sriram Subramanian made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Sriram Subramanian made changes -
        Attachment kafka-347.patch [ 12592010 ]
        Hide
        Guozhang Wang added a comment -

        A few comments:

        1. In addPartitions function, we can set partitionStartIndex as the last partition of the partition instead of partition 0 of the topic, since by doing so as more and more of this function gets called, the broker owning partition 0's leader will have more leader partitions assigned to it.

        2. Some of the changes in ZookeeperConsumerConnector for KAFKA-969 is also included in the patch. Better trim them out for this patch.

        3. Are there two getManualReplicaAssignment in AddPartitionsCommand and CreateTopicCommand with one parameter difference? If yes could we combine them into one?

        Some minor stuff:

        1. In AdminUtils, there are two more space indents for line of "ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData)"

        2. In createOrUpdateTopicPartitionAssignmentPathInZK, use

        if

        { // } else { // }
        Show
        Guozhang Wang added a comment - A few comments: 1. In addPartitions function, we can set partitionStartIndex as the last partition of the partition instead of partition 0 of the topic, since by doing so as more and more of this function gets called, the broker owning partition 0's leader will have more leader partitions assigned to it. 2. Some of the changes in ZookeeperConsumerConnector for KAFKA-969 is also included in the patch. Better trim them out for this patch. 3. Are there two getManualReplicaAssignment in AddPartitionsCommand and CreateTopicCommand with one parameter difference? If yes could we combine them into one? Some minor stuff: 1. In AdminUtils, there are two more space indents for line of "ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionData)" 2. In createOrUpdateTopicPartitionAssignmentPathInZK, use if { // } else { // }
        Hide
        Sriram Subramanian added a comment -

        1. That would not give you the right result. The start index is for the replica of the first partition we create. This is to continue from where we left for that topic.

        2. That is on purpose. There is no change. Just if else fix.

        3. This is on purpose to prevent any regression for 0.8.

        Show
        Sriram Subramanian added a comment - 1. That would not give you the right result. The start index is for the replica of the first partition we create. This is to continue from where we left for that topic. 2. That is on purpose. There is no change. Just if else fix. 3. This is on purpose to prevent any regression for 0.8.
        Hide
        Joel Koshy added a comment -

        Thank you for the patch. Couple of comments, all very minor:

        AddPartitionsCommand:

        • IMO it is more intuitive for the option to be: "total partitions desired"
          as opposed to "num partitions to add"
        • It is a bit odd that we can allow some partitions with a different
          replication factor from what's already there. I don't see any issues with
          it though. I just think it's a bit odd. One potential issue is if
          producers explicitly want to set acks to say 3 when there are some
          partitions with a replication factor of 2 and some with 3 (However,
          producers really should be using -1 in which case it would be fine).
        • I think the command can currently allow an unintentional reassignment of
          replicas since the persistent path is always updated. (or no?) I think
          this can be easily checked for and avoided.
        • Apart from start partition id I think getManualReplicaAssignment is
          identical to CreateTopicCommand's - maybe that code can be moved into
          AdminUtils?

        KafkaController:

        • nitpick: ZkUtils.getAllTopics(zkClient).foreach(p =>
          partitionStateMachine.registerPartitionChangeListener(p)) (can you change
          p to t - p really looks like a partition but it is a topic )

        AdminUtils:

        • the //"for testing only" comment is now misplaced.
        • This code is pre-existing, but would prefer changing secondReplicaShift to
          nextReplicaShift.
        • Any reason why AddPartitionsTest should not be within AdminTest?
        • Finally, can you rebase? Sorry for not getting to this patch sooner
        Show
        Joel Koshy added a comment - Thank you for the patch. Couple of comments, all very minor: AddPartitionsCommand: IMO it is more intuitive for the option to be: "total partitions desired" as opposed to "num partitions to add" It is a bit odd that we can allow some partitions with a different replication factor from what's already there. I don't see any issues with it though. I just think it's a bit odd. One potential issue is if producers explicitly want to set acks to say 3 when there are some partitions with a replication factor of 2 and some with 3 (However, producers really should be using -1 in which case it would be fine). I think the command can currently allow an unintentional reassignment of replicas since the persistent path is always updated. (or no?) I think this can be easily checked for and avoided. Apart from start partition id I think getManualReplicaAssignment is identical to CreateTopicCommand's - maybe that code can be moved into AdminUtils? KafkaController: nitpick: ZkUtils.getAllTopics(zkClient).foreach(p => partitionStateMachine.registerPartitionChangeListener(p)) (can you change p to t - p really looks like a partition but it is a topic ) AdminUtils: the //"for testing only" comment is now misplaced. This code is pre-existing, but would prefer changing secondReplicaShift to nextReplicaShift. Any reason why AddPartitionsTest should not be within AdminTest? Finally, can you rebase? Sorry for not getting to this patch sooner
        Hide
        Neha Narkhede added a comment -

        Overall, this looks great and sorry for not coming to this patch earlier. Few minor comments -

        1. KafkaController

        1.1 We can read the list of all topics from cache instead of reading from zookeeper, since initializeControllerContext already does that
        1.2 In onNewTopicCreation, what was the motivation to move onNewPartitionCreation to before the registration of the listener
        2. PartitionStateMachine.AddPartitionsListener

        controllerContext.partitionReplicaAssignment gets populated during the NewPartition state transition.
        Due to this, it is best to get rid of the following in the listener, it should happen as part of the NewPartition state change
        controllerContext.partitionReplicaAssignment.++=(partitionsRemainingToBeAdded)

        3. AdminUtils

        In createOrUpdateTopicPartitionAssignmentPathInZK, please change topic creation -> Topic creation

        4. There is a conflict in ZookeeperConsumerConnector

        Show
        Neha Narkhede added a comment - Overall, this looks great and sorry for not coming to this patch earlier. Few minor comments - 1. KafkaController 1.1 We can read the list of all topics from cache instead of reading from zookeeper, since initializeControllerContext already does that 1.2 In onNewTopicCreation, what was the motivation to move onNewPartitionCreation to before the registration of the listener 2. PartitionStateMachine.AddPartitionsListener controllerContext.partitionReplicaAssignment gets populated during the NewPartition state transition. Due to this, it is best to get rid of the following in the listener, it should happen as part of the NewPartition state change controllerContext.partitionReplicaAssignment.++=(partitionsRemainingToBeAdded) 3. AdminUtils In createOrUpdateTopicPartitionAssignmentPathInZK, please change topic creation -> Topic creation 4. There is a conflict in ZookeeperConsumerConnector
        Sriram Subramanian made changes -
        Attachment kafka-347-v2.patch [ 12593224 ]
        Hide
        Sriram Subramanian added a comment -

        Joel

        • I did not see a big difference in the meaning
        • good point. I now check if the replication factor specified is the same as that of the topic.
        • I dont think so. We only add new rows.
        • This was done to reduce regression.
        • Done
        • Done
        • Done
        • I have a bunch to initial setup that is not present in that test and they dont require it

        Neha

        • Done
        • It seemed safer and intuitive to create a listener to a topic after it is created
        • Done
        • Done
        • Done
        Show
        Sriram Subramanian added a comment - Joel I did not see a big difference in the meaning good point. I now check if the replication factor specified is the same as that of the topic. I dont think so. We only add new rows. This was done to reduce regression. Done Done Done I have a bunch to initial setup that is not present in that test and they dont require it Neha Done It seemed safer and intuitive to create a listener to a topic after it is created Done Done Done
        Hide
        Joel Koshy added a comment -

        Thanks for patch v2. I'm +1 on this as is, but if you can address some of these minor comments that would be great.

        v2.1 - For "num partitions to add" vs "partitions desired" - all I meant was that most of the time users would think
        of "desired number of partitions" vs "how many more to add". E.g., I have eight partitions for a topic, I now want
        20 instead. It is more convenient to just say I want "20" partitions instead of thinking of how many to add. OTOH since
        we don't support reducing partitions treating it as a "num partitions to add" is safer. So I don't feel very strongly
        about it either way.

        v2.2 - Re: unintentional reassignment of partitions. Yes you are right.

        v2.3 - Your patch still has ZookeeperConsumerConnector changes in it, so it did not apply cleanly.

        v2.4 - On checking the replication factor: if we don't allow having a different replication factor for the new partitions
        we should not even expose it as an option.

        v2.5 - AddPartitionsListener: no need to change it now, but just a comment: we can directly parse the replica assignment
        from the data object (instead of reading from zookeeper again) right?

        v2.6 - On moving getManualReplicaAssignment to AdminUtils - I think it would be good to do that here, but either way is
        fine.

        Show
        Joel Koshy added a comment - Thanks for patch v2. I'm +1 on this as is, but if you can address some of these minor comments that would be great. v2.1 - For "num partitions to add" vs "partitions desired" - all I meant was that most of the time users would think of "desired number of partitions" vs "how many more to add". E.g., I have eight partitions for a topic, I now want 20 instead. It is more convenient to just say I want "20" partitions instead of thinking of how many to add. OTOH since we don't support reducing partitions treating it as a "num partitions to add" is safer. So I don't feel very strongly about it either way. v2.2 - Re: unintentional reassignment of partitions. Yes you are right. v2.3 - Your patch still has ZookeeperConsumerConnector changes in it, so it did not apply cleanly. v2.4 - On checking the replication factor: if we don't allow having a different replication factor for the new partitions we should not even expose it as an option. v2.5 - AddPartitionsListener: no need to change it now, but just a comment: we can directly parse the replica assignment from the data object (instead of reading from zookeeper again) right? v2.6 - On moving getManualReplicaAssignment to AdminUtils - I think it would be good to do that here, but either way is fine.
        Hide
        Sriram Subramanian added a comment -

        v2.4 the reason to expose it is for manual replica assignment. It is more explicit to specify the rep factor and the assignments for those.
        Rebased without the zkconsumer connector change.

        Show
        Sriram Subramanian added a comment - v2.4 the reason to expose it is for manual replica assignment. It is more explicit to specify the rep factor and the assignments for those. Rebased without the zkconsumer connector change.
        Sriram Subramanian made changes -
        Attachment KAFKA-347-v2-rebased.patch [ 12593554 ]
        Hide
        Jun Rao added a comment -

        Thanks for patch v2. Looks good overall. Some comments.

        20. AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(): The info logging should be different, depending on whether the topic is created or updated. Also, fix the indentation in the else clause.

        21. AddPartitionsCommand:
        21.1 remove unused imports
        21.2 Is there any value in having the "replica" option? It seems that it should always be the existing replication factor.
        21.3 For the "replica-assignment-list" option, could we make it clear in the description that this is for newly added partitions.
        21.4 getManualReplicaAssignment(): We need to make sure the replica factor is the same as the existing one.

        22. KafkaController.onNewTopicCreation(): Could you explain why the onNewPartition statement is moved to before the watcher registration? Normally, in order not to miss any watchers, one needs to register the watcher before reading the associated nodes in ZK.

        23. PartitionStateMachine.AddPartitionsListener.handleDataChange():
        23.1 In the following statement, we are actually returning all replica assignments.
        val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
        23.2 Instead of using controllerContext.partitionLeadershipInfo to filter out existing partitions, it's probably better to use controllerContext.partitionReplicaAssignment, since leaders may not always exist.
        23.3 In the error logging, could we add the affected data path?

        24. AddPartitionsTest: remove unused imports

        25. Did we do any test to make sure that existing consumers can pick up the new partitions?

        26. The patch needs to be rebased.

        Show
        Jun Rao added a comment - Thanks for patch v2. Looks good overall. Some comments. 20. AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(): The info logging should be different, depending on whether the topic is created or updated. Also, fix the indentation in the else clause. 21. AddPartitionsCommand: 21.1 remove unused imports 21.2 Is there any value in having the "replica" option? It seems that it should always be the existing replication factor. 21.3 For the "replica-assignment-list" option, could we make it clear in the description that this is for newly added partitions. 21.4 getManualReplicaAssignment(): We need to make sure the replica factor is the same as the existing one. 22. KafkaController.onNewTopicCreation(): Could you explain why the onNewPartition statement is moved to before the watcher registration? Normally, in order not to miss any watchers, one needs to register the watcher before reading the associated nodes in ZK. 23. PartitionStateMachine.AddPartitionsListener.handleDataChange(): 23.1 In the following statement, we are actually returning all replica assignments. val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) 23.2 Instead of using controllerContext.partitionLeadershipInfo to filter out existing partitions, it's probably better to use controllerContext.partitionReplicaAssignment, since leaders may not always exist. 23.3 In the error logging, could we add the affected data path? 24. AddPartitionsTest: remove unused imports 25. Did we do any test to make sure that existing consumers can pick up the new partitions? 26. The patch needs to be rebased.
        Hide
        Sriram Subramanian added a comment -

        20. Indentation seems fine to me.
        21.2 It is present to make manual assignment more clear.
        25 Yes the test was done. I will do another sanity check after the patch is commited.

        Show
        Sriram Subramanian added a comment - 20. Indentation seems fine to me. 21.2 It is present to make manual assignment more clear. 25 Yes the test was done. I will do another sanity check after the patch is commited.
        Sriram Subramanian made changes -
        Attachment KAFKA-347-v3.patch [ 12593566 ]
        Sriram Subramanian made changes -
        Attachment KAFKA-347-v4.patch [ 12593613 ]
        Hide
        Sriram Subramanian added a comment -

        added a script for addpartitions

        Show
        Sriram Subramanian added a comment - added a script for addpartitions
        Sriram Subramanian made changes -
        Attachment KAFKA-347-v5.patch [ 12593623 ]
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Committed to 0.8. Could you also provide a patch to trunk since in trunk, all topic related commands are consolidated to a kafka-topic tool.

        Show
        Jun Rao added a comment - Thanks for the patch. Committed to 0.8. Could you also provide a patch to trunk since in trunk, all topic related commands are consolidated to a kafka-topic tool.
        Hide
        Cosmin Lehene added a comment -

        Is there some operational documentation on how to use this?

        Show
        Cosmin Lehene added a comment - Is there some operational documentation on how to use this?
        Hide
        Jun Rao added a comment -

        You just need to run bin/kafka-add-partitions.sh. We will add the docs when 0.8 final is released.

        Show
        Jun Rao added a comment - You just need to run bin/kafka-add-partitions.sh. We will add the docs when 0.8 final is released.
        Sriram Subramanian made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Sriram Subramanian made changes -
        Status Resolved [ 5 ] Closed [ 6 ]

          People

          • Assignee:
            Sriram Subramanian
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development