Kafka
  1. Kafka
  2. KAFKA-990

Fix ReassignPartitionCommand and improve usability

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      1. The tool does not register for IsrChangeListener on controller failover.
      2. There is a race condition where the previous listener can fire on controller failover and the replicas can be in ISR. Even after re-registering the ISR listener after failover, it will never be triggered.
      3. The input the tool is a static list which is very hard to use. To improve this, as a first step the tool needs to take a list of topics and list of brokers to do the assignment to and then generate the reassignment plan.

      1. KAFKA-990-v1.patch
        22 kB
        Sriram Subramanian
      2. KAFKA-990-v1-rebased.patch
        27 kB
        Sriram Subramanian
      3. KAFKA-990-v2.patch
        33 kB
        Sriram Subramanian
      4. KAFKA-990-v3.patch
        36 kB
        Sriram Subramanian

        Activity

        Hide
        Neha Narkhede added a comment -

        Thanks for patch v1. Could you please fix the compilation errors though?

        Show
        Neha Narkhede added a comment - Thanks for patch v1. Could you please fix the compilation errors though?
        Hide
        Neha Narkhede added a comment - - edited

        Thanks for the rebased patch, Sriram. Overall, the changes look great. +1. One minor suggestion -

        ReassignPartitionsCommand

        For determining the replication factor for replica assignment, can we just use the first or last partition in the map instead of relying on a partition id 0? That way if we change the assumption that partition id should always start from 0, this will not break. -
        topicInfo._2.head._2.size instead of topicInfo._2.get(TopicAndPartition(topicInfo._1, 0)).get.size

        If you are ok with this suggestion, I can make it on checkin.

        Also, it seems that #2 in the description above was not really a problem. This is because onPartitionReassignment checks areReplicasInIsr and hence restarts the reassignment correctly. This is however not true if we hit #1, which is a real issue.

        Show
        Neha Narkhede added a comment - - edited Thanks for the rebased patch, Sriram. Overall, the changes look great. +1. One minor suggestion - ReassignPartitionsCommand For determining the replication factor for replica assignment, can we just use the first or last partition in the map instead of relying on a partition id 0? That way if we change the assumption that partition id should always start from 0, this will not break. - topicInfo._2.head._2.size instead of topicInfo._2.get(TopicAndPartition(topicInfo._1, 0)).get.size If you are ok with this suggestion, I can make it on checkin. Also, it seems that #2 in the description above was not really a problem. This is because onPartitionReassignment checks areReplicasInIsr and hence restarts the reassignment correctly. This is however not true if we hit #1, which is a real issue.
        Hide
        Joel Koshy added a comment -
        • Topics to move json file format seems unnecessarily complicated. Why not just a JSON array?
        • Use CommandLineUtils.checkRequiredArgs
        • May be helpful to also print out the existing partition assignment and the final assignment.
        • "dryrun" to "dry-run" which I think is the spelling unix tools like patch tend to use.
        • line 88: use head instead of assuming 0 exists (start partition id could be != 0)

        I did not finish going through all the changes in controller, but thought I would put in my comments so far

        Show
        Joel Koshy added a comment - Topics to move json file format seems unnecessarily complicated. Why not just a JSON array? Use CommandLineUtils.checkRequiredArgs May be helpful to also print out the existing partition assignment and the final assignment. "dryrun" to "dry-run" which I think is the spelling unix tools like patch tend to use. line 88: use head instead of assuming 0 exists (start partition id could be != 0) I did not finish going through all the changes in controller, but thought I would put in my comments so far
        Hide
        Joel Koshy added a comment -

        Can you elaborate on the change to shutdownBroker in KafkaController? I
        think we need to include shutting down brokers because the previous shutdown
        attempt may have been incomplete due to no other brokers in ISR for some
        partition which would have prevented leader movement. Subsequent attempts
        would now be rejected.

        Good catches on the controller failover. Agree with Neha that #2 is not a
        problem for replicas that are in ISR, however, we do need to re-register the
        ISR change listener for those replicas that are in ISR.

        Finally, we should probably open a separate jira to implement a feature to
        cancel an ongoing reassignment given that it is a long-running operation.
        The dry-run option reduces the need for this but nevertheless I think it's a
        good feature to support in the future.

        Show
        Joel Koshy added a comment - Can you elaborate on the change to shutdownBroker in KafkaController? I think we need to include shutting down brokers because the previous shutdown attempt may have been incomplete due to no other brokers in ISR for some partition which would have prevented leader movement. Subsequent attempts would now be rejected. Good catches on the controller failover. Agree with Neha that #2 is not a problem for replicas that are in ISR, however, we do need to re-register the ISR change listener for those replicas that are in ISR. Finally, we should probably open a separate jira to implement a feature to cancel an ongoing reassignment given that it is a long-running operation. The dry-run option reduces the need for this but nevertheless I think it's a good feature to support in the future.
        Hide
        Joel Koshy added a comment -

        Looks like I might have looked at the wrong patch. I'll review this again this weekend.

        Show
        Joel Koshy added a comment - Looks like I might have looked at the wrong patch. I'll review this again this weekend.
        Hide
        Joel Koshy added a comment -

        The rebased patch looks good - the shutdown changes I was referring to were in v1.

        +1 on the rebased patch - we can fix the minor comments either on check-in or in a separate jira.

        Show
        Joel Koshy added a comment - The rebased patch looks good - the shutdown changes I was referring to were in v1. +1 on the rebased patch - we can fix the minor comments either on check-in or in a separate jira.
        Hide
        Guozhang Wang added a comment -

        Looks good to me overall. One question though: in shutdownBroker, should we check liveOrShuttingDownBrokerIds or liveBrokerIds? I saw going back and forth, and hence a little confused which criteria should be applied here?

        Show
        Guozhang Wang added a comment - Looks good to me overall. One question though: in shutdownBroker, should we check liveOrShuttingDownBrokerIds or liveBrokerIds? I saw going back and forth, and hence a little confused which criteria should be applied here?
        Hide
        Joel Koshy added a comment -

        It should be liveOrShuttingDownBrokerIds. This is required because a controlled shutdown attempt may
        fail - if there are no other brokers in ISR for a partition led by the broker being shutdown. In this case we
        would want to proceed with a retry (if there are retries left).

        Show
        Joel Koshy added a comment - It should be liveOrShuttingDownBrokerIds. This is required because a controlled shutdown attempt may fail - if there are no other brokers in ISR for a partition led by the broker being shutdown. In this case we would want to proceed with a retry (if there are retries left).
        Hide
        Jun Rao added a comment -

        Thanks for the patch. Looks good overall. Some comments.

        1. KafkaController:
        1.1 onPartitionReassignment(): The comment above the function needs to be updated. For example, we no longer register the ISR listener here.
        1.2 initiateReassignPartitionForTopic(): We fail the reassignment if not all brokers in RAR are alive. I am wondering if this is necessary. A broker can go down when the reassignment process is in progress and we still need to handle this case.
        1.3 Should watchIsrChangesForReassignedPartition() be private?

        2. ReassignPartitionsCommand
        2.1 It seems that we don't allow the options "topics-to-move-json-file" and "manual-assignment-json-file" to co-exist. Could we add an explicit check and output an appropriate message?
        2.2 If "broker-list" is not specified, it seems that we should default to the current list of live brokers.
        2.3 Could we somehow make dryRun the default behavior? In other words, the user has to add another option to disable dry run.

        3. Since the reassignment process requires fetching old data and may pollute the pagecache, do you see any performance impact to produce/fetch request latency when the reassignment is in progress?

        Show
        Jun Rao added a comment - Thanks for the patch. Looks good overall. Some comments. 1. KafkaController: 1.1 onPartitionReassignment(): The comment above the function needs to be updated. For example, we no longer register the ISR listener here. 1.2 initiateReassignPartitionForTopic(): We fail the reassignment if not all brokers in RAR are alive. I am wondering if this is necessary. A broker can go down when the reassignment process is in progress and we still need to handle this case. 1.3 Should watchIsrChangesForReassignedPartition() be private? 2. ReassignPartitionsCommand 2.1 It seems that we don't allow the options "topics-to-move-json-file" and "manual-assignment-json-file" to co-exist. Could we add an explicit check and output an appropriate message? 2.2 If "broker-list" is not specified, it seems that we should default to the current list of live brokers. 2.3 Could we somehow make dryRun the default behavior? In other words, the user has to add another option to disable dry run. 3. Since the reassignment process requires fetching old data and may pollute the pagecache, do you see any performance impact to produce/fetch request latency when the reassignment is in progress?
        Hide
        Neha Narkhede added a comment -

        1.2 If a broker goes down after the reassignment has started, it will not enter the ISR. So the reassignment will not complete. If the replica fails after the 2nd stage of reassignment is complete, it will get handled through the normal logic of handling failed replicas since the reassignment is complete at that point. I'm not sure there is any value in starting an expensive process like reassignment of partitions when the target replicas are not even alive.
        3. This is the same problem we have if we replace the broker machine and bring up another broker with the same id. I think the problem is off somehow throttling replica fetches. This is a good idea. Can we file a separate JIRA for this?

        Show
        Neha Narkhede added a comment - 1.2 If a broker goes down after the reassignment has started, it will not enter the ISR. So the reassignment will not complete. If the replica fails after the 2nd stage of reassignment is complete, it will get handled through the normal logic of handling failed replicas since the reassignment is complete at that point. I'm not sure there is any value in starting an expensive process like reassignment of partitions when the target replicas are not even alive. 3. This is the same problem we have if we replace the broker machine and bring up another broker with the same id. I think the problem is off somehow throttling replica fetches. This is a good idea. Can we file a separate JIRA for this?
        Hide
        Swapnil Ghike added a comment - - edited

        The rebased patch also failed for me on 0.8 HEAD

        $patch -p1 --dry-run < ~/Downloads/KAFKA-990-v1-rebased.patch
        patching file core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
        patching file core/src/main/scala/kafka/utils/ZkUtils.scala
        Hunk #1 succeeded at 620 (offset 42 lines).
        patching file core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
        Hunk #1 FAILED at 29.
        Hunk #2 FAILED at 81.

        Show
        Swapnil Ghike added a comment - - edited The rebased patch also failed for me on 0.8 HEAD $patch -p1 --dry-run < ~/Downloads/ KAFKA-990 -v1-rebased.patch patching file core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala patching file core/src/main/scala/kafka/utils/ZkUtils.scala Hunk #1 succeeded at 620 (offset 42 lines). patching file core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala Hunk #1 FAILED at 29. Hunk #2 FAILED at 81.
        Hide
        Swapnil Ghike added a comment -

        v1 works with git apply.

        Show
        Swapnil Ghike added a comment - v1 works with git apply.
        Hide
        Sriram Subramanian added a comment -

        Neha - fixed what you suggested

        Jun -

        1. KafkaController:
        1.1 done
        1.2 we can fail for now. we can revisit this.
        1.3 done

        2. ReassignPartitionsCommand
        2.1 I did not do that as it makes the code ugly and it does not cause any harm. Let me know if you are strong about this.
        2.2 i think it is safer to be explicit instead of using the live brokers for the move and causing perf issues
        2.3 done
        3. polluting the page cache is debatable. We could do the log appends on the follower by-passing the cache but when the follower becomes the leader, it could cause lot of IO. Another option is to throttle the rate at which the appends happen on the follower that reduces the sudden influx of messages at the follower and fetch requests at the leader. Both of these are outside the scope of this JIRA.

        Show
        Sriram Subramanian added a comment - Neha - fixed what you suggested Jun - 1. KafkaController: 1.1 done 1.2 we can fail for now. we can revisit this. 1.3 done 2. ReassignPartitionsCommand 2.1 I did not do that as it makes the code ugly and it does not cause any harm. Let me know if you are strong about this. 2.2 i think it is safer to be explicit instead of using the live brokers for the move and causing perf issues 2.3 done 3. polluting the page cache is debatable. We could do the log appends on the follower by-passing the cache but when the follower becomes the leader, it could cause lot of IO. Another option is to throttle the rate at which the appends happen on the follower that reduces the sudden influx of messages at the follower and fetch requests at the leader. Both of these are outside the scope of this JIRA.
        Hide
        Swapnil Ghike added a comment -

        Comments on v1:

        I think a much clearer name for this tool would be ReassignReplicasCommand.

        Admin tool:
        11. I think the tool should also have a verify/validate option that you can run after the replica reassignment has been completed. As of now, the reassignment of a certain partition can fail and the admin won't know without looking at the controller log.
        12. It would be good to make dry-run the default behaviour.
        13. topics could be a json array, but I don't have a strong opinion one way or the other.
        14. we should explicitly check that the options for the manual replica assignment and the assignment using brokerList should not be allowed at the same time. We should also check that the brokerList option is always provided with the topicsToMoveJsonFile option.
        15. The "failed" messages could probably go to System.err.
        16. We should probably not use the partition Id 0 in calling assignReplicasToBrokers, can we instead use

        { val topicAndPartition = groupedByTopic.get(topicInfo._1).get.get(0); val replicationFactor= topicInfo._2.get(topicAndPartition).get.size}

        ?
        17. dryrun --> dry-run? I just remember seeing the latter more often across other tools.

        On the controller:
        21. I think a clearer name for initiateReassignPartitionForTopic would be initiateReassignReplicasForTopicPartition; and similar renames if any.
        22. We should fix the comment in ReassignPartitionsIsrChangeListener
        23. We should update controllerContext.partitionReplicaAssignment only once in KafkaController.updateAssignedReplicasForPartition(). There is an extra over-write as of now.
        24. We should batch the requests in KafkaController.StopOldReplicasOfReassignedPartition()
        25. We should call startNewReplicasForReassignedPartition directly in initiateReassignPartitionForTopic instead of calling onPartitionReassignment. As of now, every time areReplicasInIsr returns fals, the controller will call startNewReplicasForReassignedPartition and then log a StateChangeFailedException, because the replicas were already in the new state. This exception will be logged in every call of onPartitionReassignment except for the first call.
        26. We should remove the false condition from onPartitionReassignment
        27. Currently, for each partition that is reassigned, controller deletes the /admin/reassign_partitions zk path, and populates it with a new list with the reassigned partition removed from the original list. This is probably an overkill, and we can delete the zk path completely once the reassignment of all partitions has completed successfully or in error. Even if there was a controller failover when the reassignment was in progress, the new controller should be able to decide which partitions have already been reassigned and which have not been in initiateReassignPartitionForTopic.

        Show
        Swapnil Ghike added a comment - Comments on v1: I think a much clearer name for this tool would be ReassignReplicasCommand. Admin tool: 11. I think the tool should also have a verify/validate option that you can run after the replica reassignment has been completed. As of now, the reassignment of a certain partition can fail and the admin won't know without looking at the controller log. 12. It would be good to make dry-run the default behaviour. 13. topics could be a json array, but I don't have a strong opinion one way or the other. 14. we should explicitly check that the options for the manual replica assignment and the assignment using brokerList should not be allowed at the same time. We should also check that the brokerList option is always provided with the topicsToMoveJsonFile option. 15. The "failed" messages could probably go to System.err. 16. We should probably not use the partition Id 0 in calling assignReplicasToBrokers, can we instead use { val topicAndPartition = groupedByTopic.get(topicInfo._1).get.get(0); val replicationFactor= topicInfo._2.get(topicAndPartition).get.size} ? 17. dryrun --> dry-run? I just remember seeing the latter more often across other tools. On the controller: 21. I think a clearer name for initiateReassignPartitionForTopic would be initiateReassignReplicasForTopicPartition; and similar renames if any. 22. We should fix the comment in ReassignPartitionsIsrChangeListener 23. We should update controllerContext.partitionReplicaAssignment only once in KafkaController.updateAssignedReplicasForPartition(). There is an extra over-write as of now. 24. We should batch the requests in KafkaController.StopOldReplicasOfReassignedPartition() 25. We should call startNewReplicasForReassignedPartition directly in initiateReassignPartitionForTopic instead of calling onPartitionReassignment. As of now, every time areReplicasInIsr returns fals, the controller will call startNewReplicasForReassignedPartition and then log a StateChangeFailedException, because the replicas were already in the new state. This exception will be logged in every call of onPartitionReassignment except for the first call. 26. We should remove the false condition from onPartitionReassignment 27. Currently, for each partition that is reassigned, controller deletes the /admin/reassign_partitions zk path, and populates it with a new list with the reassigned partition removed from the original list. This is probably an overkill, and we can delete the zk path completely once the reassignment of all partitions has completed successfully or in error. Even if there was a controller failover when the reassignment was in progress, the new controller should be able to decide which partitions have already been reassigned and which have not been in initiateReassignPartitionForTopic.
        Hide
        Jun Rao added a comment -

        Thanks for patch v2. A few more comments.

        2.1 I think it's better to guard this in the command line. The issue is that if a user provided both options, it's not clear which one takes precedence.
        2.2 In that case, we should make sure that brokerList is a mandatory field (like zkConnect).

        30. KafkaController.initializeAndMaybeTriggerPartitionReassignment(): The following comment is weird.
        // need to call method

        31. Related to Swapnil's comment in #11, currently, the tool finishes after the ZK path is created. It would be useful to add an option to check the state of partition reassignment so that we know either all assignments have completed or the set of partitions that are remaining.

        Show
        Jun Rao added a comment - Thanks for patch v2. A few more comments. 2.1 I think it's better to guard this in the command line. The issue is that if a user provided both options, it's not clear which one takes precedence. 2.2 In that case, we should make sure that brokerList is a mandatory field (like zkConnect). 30. KafkaController.initializeAndMaybeTriggerPartitionReassignment(): The following comment is weird. // need to call method 31. Related to Swapnil's comment in #11, currently, the tool finishes after the ZK path is created. It would be useful to add an option to check the state of partition reassignment so that we know either all assignments have completed or the set of partitions that are remaining.
        Hide
        Sriram Subramanian added a comment -

        2.1 will do so.
        2.2 We cannot make it mandatory. It is not required when explicit list is specified. In the case when only topics are specified we do make it mandatory.

        31. There is already a tool for that. It is called CheckReassignmentStatus.

        Show
        Sriram Subramanian added a comment - 2.1 will do so. 2.2 We cannot make it mandatory. It is not required when explicit list is specified. In the case when only topics are specified we do make it mandatory. 31. There is already a tool for that. It is called CheckReassignmentStatus.
        Hide
        Guozhang Wang added a comment -

        Just one more comment: as for Swapnil's 16, currently partition id 0 is also used for AddPartitionCommand. Shall we change that also?

        Show
        Guozhang Wang added a comment - Just one more comment: as for Swapnil's 16, currently partition id 0 is also used for AddPartitionCommand. Shall we change that also?
        Hide
        Sriram Subramanian added a comment -
        • made the dry run the default
        • added some more input validations for the tool
        • some renaming to the controller methods

        Swapnil -

        23. We seem to be updating only once. Let me know if that is not the case.
        24. It is hard to batch with the way we have the code now. The handleStateChange works per topic partition
        25. That would cause us to explicitly invoke startNewReplicasForReassignedPartition in each case outside onPartitionReassignment which is hard to maintain.
        26. Same comment as above
        27. This is a lot more than what we want to do in 0.8. The issue is if we do not update, we need to add more checks to ensure it is already done or has failed. We can try to optimize that in trunk.

        Show
        Sriram Subramanian added a comment - made the dry run the default added some more input validations for the tool some renaming to the controller methods Swapnil - 23. We seem to be updating only once. Let me know if that is not the case. 24. It is hard to batch with the way we have the code now. The handleStateChange works per topic partition 25. That would cause us to explicitly invoke startNewReplicasForReassignedPartition in each case outside onPartitionReassignment which is hard to maintain. 26. Same comment as above 27. This is a lot more than what we want to do in 0.8. The issue is if we do not update, we need to add more checks to ensure it is already done or has failed. We can try to optimize that in trunk.
        Hide
        Sriram Subramanian added a comment -

        Guozhang - We have not merged addpartition to trunk yet. We plan to do once we merge the 0.8 code to trunk.

        Show
        Sriram Subramanian added a comment - Guozhang - We have not merged addpartition to trunk yet. We plan to do once we merge the 0.8 code to trunk.
        Hide
        Neha Narkhede added a comment -

        +1 on v3. Also, it will be good to file a separate JIRA for Swapnil's suggestion #27

        Show
        Neha Narkhede added a comment - +1 on v3. Also, it will be good to file a separate JIRA for Swapnil's suggestion #27
        Hide
        Neha Narkhede added a comment -

        Thanks for the patches, committed v3 to 0.8

        Show
        Neha Narkhede added a comment - Thanks for the patches, committed v3 to 0.8

          People

          • Assignee:
            Sriram Subramanian
            Reporter:
            Sriram Subramanian
          • Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development