Kafka
  1. Kafka
  2. KAFKA-42

Support rebalancing the partitions with replication

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:

      Description

      As new brokers are added, we need to support moving partition replicas from one set of brokers to another, online.

      1. KAFKA-42_2014-07-24_15:36:04.patch
        1.24 MB
        Ivan Lyutov
      2. KAFKA-42_2014-07-24_15:37:26.patch
        1.24 MB
        Ivan Lyutov
      3. KAFKA-42_2014-12-29_16:36:41.patch
        2 kB
        Parth Brahmbhatt
      4. KAFKA-42.patch
        0.0 kB
        sungjubong
      5. kafka-42-v1.patch
        103 kB
        Neha Narkhede
      6. kafka-42-v2.patch
        119 kB
        Neha Narkhede
      7. kafka-42-v3.patch
        115 kB
        Neha Narkhede
      8. kafka-42-v4.patch
        116 kB
        Neha Narkhede
      9. kafka-42-v5.patch
        152 kB
        Neha Narkhede

        Issue Links

          Activity

          Hide
          Neha Narkhede added a comment -

          This is a pretty tricky feature. Since it involves multiple state changes before reassignment can be marked complete, there are many failure conditions to think about and handle recovery correctly

          1. Admin tool changes
          1.1 Added a new admin command reassign-partition. Right now, it handles one partition, since I thought the failure/exit conditions and error messages are simpler to handle. But if people think we should add multiple partitions support in the same command invocation, that is fine too.
          1.2 Added a new reassignPartition(topic, partition, RAR) API that registers a data change listener on /admin/reassign_partitions path and then creates the /admin/reassign_partitions={} path in zookeeper. It waits until that path is deleted from zookeeper. Once it is deleted, it checks if AR == RAR. If yes, it reports success otherwise failure.
          1.3 Added a shutdown hook to handle command cancellation by the admin. In this case, it checks if reassignment was completed or not and logs the output accordingly.

          2. Controller changes

          Reassigning replicas for a partition goes through a few stages -
          RAR = Reassigned replicas
          AR = Original list of replicas for partition

          1. Register listener for ISR changes to detect when the RAR is a subset of the ISR
          2. Start new replicas RAR - AR.
          3. Wait until new replicas are in sync with the leader
          4. If the leader is not in RAR, elect a new leader from RAR
          5. Stop old replicas AR - RAR
          6. Write new AR
          7. Remove partition from the /admin/reassign_partitions path

          The above state changes steps are inside the onPartitionReassignment() callback in KafkaController.scala

          3. Partition Reassignment failure cases

          Broadly there are 2 types of failures we need to worry about -
          1. Controller failover
          2. Runtime error at the broker hosting the replica

          Let's go through the failure cases and recovery -
          1. If the controller fails over between steps 1 and 2, the new controller on startup will read the non-empty admin path and just restart the partition reassignment process from scratch
          2a. If the controller fails over between steps 2 and 3 above, the new controller will check if the new replicas are in sync with the leader or not. In either case, it will resume partition reassignment for the partitions listed in the admin path
          2b. If, for some reason, the broker is not able to start the replicas, the isr listener for reassigned partitions will not trigger. So, the controller will not resume partition reassignment process for that partition. After some time, the admin command can be killed and it will report failure and delete the admin path so it can be retried.
          3. If the controller fails over between steps 4 and 5, the new controller will realize that the new replicas are already in sync. If the new leader is part of the new replicas and is alive, it will not trigger leader re-election. Else it will re-elect the leader from amongst the live reassigned replicas.
          4a. If the controller fails over between steps 5 and 6, the new controller resumes partition reassignment and repeats steps 4 onwards
          4b. If, for some reason, the broker does not complete the leader state change, the partition after reassignment will be offline. This is a problem we have today even for leader election of newly created partitions. The controller doesn't wait for an acknowledgement from the broker for the make-leader state change. Nevertheless, the broker can fail even after sending a successful ack, so there isn't much value in waiting for an ack. However, I think the leader broker should expose an mbean to signify the availability of a partition. If people think this is a good idea, I can file a bug to fix this.
          5. If the controller fails over between steps 6 and 7, it deletes the partition from the admin path marking the completion of this partition's reassignment. The partition reassignment zookeeper listener should record partition to be reassigned only if RAR not equal AR.

          4. PartitionReassignedListener

          Starts the partition reassignment process unless -
          1. Partition previously existed
          2. New replicas are the same as existing replicas
          3. Any replica in the new set of replicas are dead

          If any of the above conditions are satisfies, it logs an error and removes the partition from list of reassigned partitions notifying the admin command about the failure/completion.

          5. PartitionLeaderSelector

          Added a self transition on the OnlinePartition state change. This is because, with cluster expansion and preferred replica leader election features, we need to move the leader for online partitions as well.

          Added partition leader selector module since we have 3 different ways of selecting the leader for a partition -
          1. Offline leader selector - Pick an alive in sync replica as the leader. Otherwise, pick an alive assigned replica
          2. Reassigned partition leader selector - Pick one of the alive in-sync reassigned replicas as the new leader
          3. Preferred replica leader selector - Pick the preferred replica as the new leader
          4. Testing

          6. Replica state machine changes
          Added 2 new states to the replica state machine -
          1. NewReplica : The controller can create new replicas during partition reassignment. In this state, a
          replica can only get become follower state change request. Valid previous
          state is NonExistentReplica
          2. OnlineReplica : Once a replica is started and part of the assigned replicas for its partition, it is in this
          state. In this state, it can get either become leader or become follower state change requests.
          Valid previous state are NewReplica, OnlineReplica or OfflineReplica
          3. OfflineReplica : If a replica dies, it moves to this state. This happens when the broker hosting the replica
          is down. Valid previous state are NewReplica, OnlineReplica
          4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica

          7. Added 6 unit test cases to test -
          1. Partition reassignment with leader of the partition in the new list of replicas
          2. Partition reassignment with leader of the partition NOT in the new list of replicas
          3. Partition reassignment with existing assigned replicas NOT overlapping with new list of replicas
          4. Partition reassignment for a non existing partition. This is a negative test case
          5. Partition reassignment for a partition that was completed upto step 6 by previous controller. This tests if after controller failover, it handles marking that partition's reassignment as completed.
          6. Partition reassignment for a partition that was completed upto step 3 by previous controller. This tests if after controller failover, it handles leader re-election correctly and completes rest of the partition reassignment process.

          Show
          Neha Narkhede added a comment - This is a pretty tricky feature. Since it involves multiple state changes before reassignment can be marked complete, there are many failure conditions to think about and handle recovery correctly 1. Admin tool changes 1.1 Added a new admin command reassign-partition. Right now, it handles one partition, since I thought the failure/exit conditions and error messages are simpler to handle. But if people think we should add multiple partitions support in the same command invocation, that is fine too. 1.2 Added a new reassignPartition(topic, partition, RAR) API that registers a data change listener on /admin/reassign_partitions path and then creates the /admin/reassign_partitions={} path in zookeeper. It waits until that path is deleted from zookeeper. Once it is deleted, it checks if AR == RAR. If yes, it reports success otherwise failure. 1.3 Added a shutdown hook to handle command cancellation by the admin. In this case, it checks if reassignment was completed or not and logs the output accordingly. 2. Controller changes Reassigning replicas for a partition goes through a few stages - RAR = Reassigned replicas AR = Original list of replicas for partition 1. Register listener for ISR changes to detect when the RAR is a subset of the ISR 2. Start new replicas RAR - AR. 3. Wait until new replicas are in sync with the leader 4. If the leader is not in RAR, elect a new leader from RAR 5. Stop old replicas AR - RAR 6. Write new AR 7. Remove partition from the /admin/reassign_partitions path The above state changes steps are inside the onPartitionReassignment() callback in KafkaController.scala 3. Partition Reassignment failure cases Broadly there are 2 types of failures we need to worry about - 1. Controller failover 2. Runtime error at the broker hosting the replica Let's go through the failure cases and recovery - 1. If the controller fails over between steps 1 and 2, the new controller on startup will read the non-empty admin path and just restart the partition reassignment process from scratch 2a. If the controller fails over between steps 2 and 3 above, the new controller will check if the new replicas are in sync with the leader or not. In either case, it will resume partition reassignment for the partitions listed in the admin path 2b. If, for some reason, the broker is not able to start the replicas, the isr listener for reassigned partitions will not trigger. So, the controller will not resume partition reassignment process for that partition. After some time, the admin command can be killed and it will report failure and delete the admin path so it can be retried. 3. If the controller fails over between steps 4 and 5, the new controller will realize that the new replicas are already in sync. If the new leader is part of the new replicas and is alive, it will not trigger leader re-election. Else it will re-elect the leader from amongst the live reassigned replicas. 4a. If the controller fails over between steps 5 and 6, the new controller resumes partition reassignment and repeats steps 4 onwards 4b. If, for some reason, the broker does not complete the leader state change, the partition after reassignment will be offline. This is a problem we have today even for leader election of newly created partitions. The controller doesn't wait for an acknowledgement from the broker for the make-leader state change. Nevertheless, the broker can fail even after sending a successful ack, so there isn't much value in waiting for an ack. However, I think the leader broker should expose an mbean to signify the availability of a partition. If people think this is a good idea, I can file a bug to fix this. 5. If the controller fails over between steps 6 and 7, it deletes the partition from the admin path marking the completion of this partition's reassignment. The partition reassignment zookeeper listener should record partition to be reassigned only if RAR not equal AR. 4. PartitionReassignedListener Starts the partition reassignment process unless - 1. Partition previously existed 2. New replicas are the same as existing replicas 3. Any replica in the new set of replicas are dead If any of the above conditions are satisfies, it logs an error and removes the partition from list of reassigned partitions notifying the admin command about the failure/completion. 5. PartitionLeaderSelector Added a self transition on the OnlinePartition state change. This is because, with cluster expansion and preferred replica leader election features, we need to move the leader for online partitions as well. Added partition leader selector module since we have 3 different ways of selecting the leader for a partition - 1. Offline leader selector - Pick an alive in sync replica as the leader. Otherwise, pick an alive assigned replica 2. Reassigned partition leader selector - Pick one of the alive in-sync reassigned replicas as the new leader 3. Preferred replica leader selector - Pick the preferred replica as the new leader 4. Testing 6. Replica state machine changes Added 2 new states to the replica state machine - 1. NewReplica : The controller can create new replicas during partition reassignment. In this state, a replica can only get become follower state change request. Valid previous state is NonExistentReplica 2. OnlineReplica : Once a replica is started and part of the assigned replicas for its partition, it is in this state. In this state, it can get either become leader or become follower state change requests. Valid previous state are NewReplica, OnlineReplica or OfflineReplica 3. OfflineReplica : If a replica dies, it moves to this state. This happens when the broker hosting the replica is down. Valid previous state are NewReplica, OnlineReplica 4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica 7. Added 6 unit test cases to test - 1. Partition reassignment with leader of the partition in the new list of replicas 2. Partition reassignment with leader of the partition NOT in the new list of replicas 3. Partition reassignment with existing assigned replicas NOT overlapping with new list of replicas 4. Partition reassignment for a non existing partition. This is a negative test case 5. Partition reassignment for a partition that was completed upto step 6 by previous controller. This tests if after controller failover, it handles marking that partition's reassignment as completed. 6. Partition reassignment for a partition that was completed upto step 3 by previous controller. This tests if after controller failover, it handles leader re-election correctly and completes rest of the partition reassignment process.
          Hide
          Neha Narkhede added a comment -

          Patch v1 contains a fix for KAFKA-525

          Show
          Neha Narkhede added a comment - Patch v1 contains a fix for KAFKA-525
          Hide
          Jun Rao added a comment -

          Thanks for patch v1. Looks good overall. Some comments:

          1. ReassignPartitionsCommand:
          1.1 If a partition doesn't exist, we should fail the operation immediately without updating ReassignPartitionsPath.
          1.2 I think it would be useful to support migrating multiple topics and partitions. We can just take a JSON file that describes the new replicas as input.
          1.3 If ReassignPartitionsPath already exists, we should quit immediately and not overwrite the path. This means that we will only allow 1 outstanding cluster rebalance at a given point of time, which is ok as long as the admin command allows multiple topic/partition being specified.

          2. Currently, we fail the partition reassignment operation if any broker in RAR is down, during initialization. However, brokers in RAR can go down after initialization. So, it would be good if we can handle RAR failures. Probably the only change needed is that when a broker is online, we need to start those replicas in RAR too.

          3. The logic is now get more complicated with the reassginment logic. Could we describe how it works in a comment?

          4. PartitionLeaderSelector.selectLeader(): describe what the return value is in the comment.

          5. ReassignedPartitionsIsrChangeListener.handleDataChange(): The following statement is weird. Only controller can change leaders and the controller always updates the leader cache every time a leader is changed. So, there shouldn't be a need for updating the leader cache on ZK listeners.
          controllerContext.allLeaders.put((topic, partition), leaderAndIsr.leader)

          6. KafkaController.onPartitionReassignment(): Could we put the logic that makes sure all replicas RAR are is ISR in onPartitionReassignment()? Currently, that logic is duplicated in 2-3 places and that logic is always followed by a call to onPartitionReassignment(). If we do this, do we still need ReassignedPartitionsContext.areNewReplicasInIsr?

          7. ReplicaStateChangeMachine:
          7.1 NonExistentReplica: The controller holds on to all replicas in this state. Is this necessary? Can we just remove them from the replicaState map.
          7.2 In the following code, we don't really need to read from ZK and can use the cached data.
          case _ =>
          // check if the leader for this partition is alive or even exists
          // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper
          // for the ISR anyways
          val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)

          8. AdminTest:
          8.1 testPartitionReassignmentWithLeaderInNewReplicas: How do we make sure that replica 0 is always the leader?
          8.2 testResumePartitionReassignmentThatWasCompleted: Towards the end, the comment says leader should be 2, but there is no broker 2 in the test.

          9. ControllerBrokerRequestBatch: Should we rename the two addRequestForBrokers to addLeaderAndIsrRequestForBrokers and addStopReplicaRequestForBrokers respectively?

          10. PartitionOfflineException,StateChangeFailedException: We can probably change the implementation to use RuntimeException(message, throwable) directly.

          11. LeaderElectionTest.testLeaderElectionAndEpoch(): Not sure if the change is correct. If there is no leadership change, leader epoch shouldn't change, right?

          Show
          Jun Rao added a comment - Thanks for patch v1. Looks good overall. Some comments: 1. ReassignPartitionsCommand: 1.1 If a partition doesn't exist, we should fail the operation immediately without updating ReassignPartitionsPath. 1.2 I think it would be useful to support migrating multiple topics and partitions. We can just take a JSON file that describes the new replicas as input. 1.3 If ReassignPartitionsPath already exists, we should quit immediately and not overwrite the path. This means that we will only allow 1 outstanding cluster rebalance at a given point of time, which is ok as long as the admin command allows multiple topic/partition being specified. 2. Currently, we fail the partition reassignment operation if any broker in RAR is down, during initialization. However, brokers in RAR can go down after initialization. So, it would be good if we can handle RAR failures. Probably the only change needed is that when a broker is online, we need to start those replicas in RAR too. 3. The logic is now get more complicated with the reassginment logic. Could we describe how it works in a comment? 4. PartitionLeaderSelector.selectLeader(): describe what the return value is in the comment. 5. ReassignedPartitionsIsrChangeListener.handleDataChange(): The following statement is weird. Only controller can change leaders and the controller always updates the leader cache every time a leader is changed. So, there shouldn't be a need for updating the leader cache on ZK listeners. controllerContext.allLeaders.put((topic, partition), leaderAndIsr.leader) 6. KafkaController.onPartitionReassignment(): Could we put the logic that makes sure all replicas RAR are is ISR in onPartitionReassignment()? Currently, that logic is duplicated in 2-3 places and that logic is always followed by a call to onPartitionReassignment(). If we do this, do we still need ReassignedPartitionsContext.areNewReplicasInIsr? 7. ReplicaStateChangeMachine: 7.1 NonExistentReplica: The controller holds on to all replicas in this state. Is this necessary? Can we just remove them from the replicaState map. 7.2 In the following code, we don't really need to read from ZK and can use the cached data. case _ => // check if the leader for this partition is alive or even exists // NOTE: technically, we could get the leader from the allLeaders cache, but we need to read zookeeper // for the ISR anyways val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) 8. AdminTest: 8.1 testPartitionReassignmentWithLeaderInNewReplicas: How do we make sure that replica 0 is always the leader? 8.2 testResumePartitionReassignmentThatWasCompleted: Towards the end, the comment says leader should be 2, but there is no broker 2 in the test. 9. ControllerBrokerRequestBatch: Should we rename the two addRequestForBrokers to addLeaderAndIsrRequestForBrokers and addStopReplicaRequestForBrokers respectively? 10. PartitionOfflineException,StateChangeFailedException: We can probably change the implementation to use RuntimeException(message, throwable) directly. 11. LeaderElectionTest.testLeaderElectionAndEpoch(): Not sure if the change is correct. If there is no leadership change, leader epoch shouldn't change, right?
          Hide
          Neha Narkhede added a comment -

          1. ReassignPartitionsCommand:
          1.1 Makes sense, changed that.

          1.2 I think that makes sense. Thinking about this more, I guess it is not such a good idea to block the admin command until all the partitions are successfully reassigned. I changed the reassign partitions admin command to issue the partition reassignment request if that path doesn't already exist. This protects accidentally overwriting the zookeeper path. I also added a check reassignment status admin command that will report if the reassignment status of a partition is completed/failed/in progress. Also, another thing to be careful about a batch reassignment API is to avoid piling up important state change requests on the controller while it reassigns multiple partitions. Since reassignment of partitions is not an urgent state change, we should give up the controller lock after each partition is reassigned. That will ensure that other state changes can sneak in, if necessary

          1.3 Yes, forgot to include that in v1 patch.

          2. Initially, I thought the admin could just re-run the partition reassignment command, but I realize that it involes one manual step.

          3, 4 Sure

          5. Good point, removed it.

          6. This check is not done on every single invocation of onPartitionReassignment, it is done on controller failover and isr change listener. It is not required to be done when the partition reassigned callback triggers. But I think it is a good idea to move it to the callback, just in case we have not covered scenarios when the check should be done.

          7.1 While changing the state of a replica to NewReplica, we need to ensure that it was in the NonExistentReplica state. We can remove the replica from the replicaState map after it moves to the NonExistentReplica state explicitly, but there is a chance it will be added back to the map again. This can happen if we re-start the replica after stopping it. But, since this is infrequent, I made this change.

          7.2 We do not cache the isr which is required for the controller to be able to send a leader and isr request to the broker
          Besides, this operation is only invoked when a new broker is started or controller fails over. Both of these operations are rare enough that we don't need to worry about optimizing this.

          8.1 There is a very good chance that it will be. This is because, we always pick the first alive assigned replica as the leader. Since replica 0 is the first assigned replica and is never shut down during the test, it will be the leader. Even if, due to some rare zookeeper session expiration issue, it is not the leader, the test will not fail.

          8.2 The comment is redundant there, so I removed it

          9, 10. Good point, fixed it

          11. It is correct since the controller increments the epoch for isr changes made by itself.

          Show
          Neha Narkhede added a comment - 1. ReassignPartitionsCommand: 1.1 Makes sense, changed that. 1.2 I think that makes sense. Thinking about this more, I guess it is not such a good idea to block the admin command until all the partitions are successfully reassigned. I changed the reassign partitions admin command to issue the partition reassignment request if that path doesn't already exist. This protects accidentally overwriting the zookeeper path. I also added a check reassignment status admin command that will report if the reassignment status of a partition is completed/failed/in progress. Also, another thing to be careful about a batch reassignment API is to avoid piling up important state change requests on the controller while it reassigns multiple partitions. Since reassignment of partitions is not an urgent state change, we should give up the controller lock after each partition is reassigned. That will ensure that other state changes can sneak in, if necessary 1.3 Yes, forgot to include that in v1 patch. 2. Initially, I thought the admin could just re-run the partition reassignment command, but I realize that it involes one manual step. 3, 4 Sure 5. Good point, removed it. 6. This check is not done on every single invocation of onPartitionReassignment, it is done on controller failover and isr change listener. It is not required to be done when the partition reassigned callback triggers. But I think it is a good idea to move it to the callback, just in case we have not covered scenarios when the check should be done. 7.1 While changing the state of a replica to NewReplica, we need to ensure that it was in the NonExistentReplica state. We can remove the replica from the replicaState map after it moves to the NonExistentReplica state explicitly, but there is a chance it will be added back to the map again. This can happen if we re-start the replica after stopping it. But, since this is infrequent, I made this change. 7.2 We do not cache the isr which is required for the controller to be able to send a leader and isr request to the broker Besides, this operation is only invoked when a new broker is started or controller fails over. Both of these operations are rare enough that we don't need to worry about optimizing this. 8.1 There is a very good chance that it will be. This is because, we always pick the first alive assigned replica as the leader. Since replica 0 is the first assigned replica and is never shut down during the test, it will be the leader. Even if, due to some rare zookeeper session expiration issue, it is not the leader, the test will not fail. 8.2 The comment is redundant there, so I removed it 9, 10. Good point, fixed it 11. It is correct since the controller increments the epoch for isr changes made by itself.
          Hide
          Jun Rao added a comment -

          Thanks for patch v2. Some more comments:

          20. ReassignPartitionsCommand:
          20.1 Could we add a description of the format of the jaon file in the command line option?
          20.2 If partitionsToBeReassigned is an empty, should we just fail the command?
          20.3 reassignPartitions(): Instead of check the existence of ReassignPartitionsPath and then write in ZK, it's better to use ZkUtils.createPersistentPath(), which throws an exception if node already exists. This will prevent the corner case that the path is created just after the existence check.
          20.4 createReassignedPartitionsPathInZK: It seems that each call to this method just overwrites ReassignPartitionsPath with 1 partition's assignment. So we will lose the assignments of all partitions except the last one?

          21. CheckReassignmentStatus: It's better to move checkIfReassignmentSucceeded and checkIfPartitionReassignmentSucceeded from ZkUtils to CheckReassignmentStatus since they are only used here and ZkUtils is getting big.

          22. KafkaController.onBrokerStartup() : It seems that we can get partitionsBeingReassigned from the cache in controllerContext, instead of from ZK.

          23. PartitionStateMachine.initializeLeaderAndIsrForPartiiton(): When writing the initial leaderAndIsr path for a new partition, there is no need to read the path first to make sure that it doesn't exists. createPersistentPath will throw an exception if the path exists.

          Show
          Jun Rao added a comment - Thanks for patch v2. Some more comments: 20. ReassignPartitionsCommand: 20.1 Could we add a description of the format of the jaon file in the command line option? 20.2 If partitionsToBeReassigned is an empty, should we just fail the command? 20.3 reassignPartitions(): Instead of check the existence of ReassignPartitionsPath and then write in ZK, it's better to use ZkUtils.createPersistentPath(), which throws an exception if node already exists. This will prevent the corner case that the path is created just after the existence check. 20.4 createReassignedPartitionsPathInZK: It seems that each call to this method just overwrites ReassignPartitionsPath with 1 partition's assignment. So we will lose the assignments of all partitions except the last one? 21. CheckReassignmentStatus: It's better to move checkIfReassignmentSucceeded and checkIfPartitionReassignmentSucceeded from ZkUtils to CheckReassignmentStatus since they are only used here and ZkUtils is getting big. 22. KafkaController.onBrokerStartup() : It seems that we can get partitionsBeingReassigned from the cache in controllerContext, instead of from ZK. 23. PartitionStateMachine.initializeLeaderAndIsrForPartiiton(): When writing the initial leaderAndIsr path for a new partition, there is no need to read the path first to make sure that it doesn't exists. createPersistentPath will throw an exception if the path exists.
          Hide
          Neha Narkhede added a comment -

          20. ReassignPartitionsCommand:
          20.1, 20.2 Sure, that is a good idea
          20.3 For this corner case to happen, another instance of the admin command would have to run at the right time. If that happens, both the admin commands might see that the path doesn't exist and try to create it. At this point, one of the admin commands will get an error and it will exit.
          20.4 Good catch, that is a bug. Initially, I wrote the entire map of all partitions using that API. But later, for per-partition sanity checks, changed it to get invoked for every partition and that probably introduced the bug.

          21. They are used in AdminTest as well, but this makes sense.

          22, 23. Included these optimizations.

          Show
          Neha Narkhede added a comment - 20. ReassignPartitionsCommand: 20.1, 20.2 Sure, that is a good idea 20.3 For this corner case to happen, another instance of the admin command would have to run at the right time. If that happens, both the admin commands might see that the path doesn't exist and try to create it. At this point, one of the admin commands will get an error and it will exit. 20.4 Good catch, that is a bug. Initially, I wrote the entire map of all partitions using that API. But later, for per-partition sanity checks, changed it to get invoked for every partition and that probably introduced the bug. 21. They are used in AdminTest as well, but this makes sense. 22, 23. Included these optimizations.
          Hide
          Jun Rao added a comment -

          Thanks for patch v3. Looks good to me overall. Just one comment:

          20.3 The problem is that reassignPartitions() uses updatePartitionReassignmentData, which in turn uses updatePersistentPath. updatePersistentPath won't throw an exception if a node already exists. So, what could happen is that 2 admin commands are issued at the same time. Both pass the existence test of the ZK path. One command writes its data in the reassignment path first. The other one then overwrites it. Now, both commands appear to have completed successfully. Using ZkUtils.createPersistentPath() instead of updatePersistentPath() would prevent this since the former throws an exception if the path already exists.

          Show
          Jun Rao added a comment - Thanks for patch v3. Looks good to me overall. Just one comment: 20.3 The problem is that reassignPartitions() uses updatePartitionReassignmentData, which in turn uses updatePersistentPath. updatePersistentPath won't throw an exception if a node already exists. So, what could happen is that 2 admin commands are issued at the same time. Both pass the existence test of the ZK path. One command writes its data in the reassignment path first. The other one then overwrites it. Now, both commands appear to have completed successfully. Using ZkUtils.createPersistentPath() instead of updatePersistentPath() would prevent this since the former throws an exception if the path already exists.
          Hide
          Neha Narkhede added a comment -

          20.3 Good point, I see what you are saying. Fixed it

          Show
          Neha Narkhede added a comment - 20.3 Good point, I see what you are saying. Fixed it
          Hide
          Jun Rao added a comment -

          Thanks for patch v4. AdminTest.testResumePartitionReassignmentAfterLeaderWasMoved seems to fail.

          [2012-10-08 21:30:06,005] ERROR [PartitionsReassignedListener on 0]: Error completing reassignment of partition [test, 0] (kafka.controller.PartitionsReassignedListener:102)
          kafka.common.KafkaException: Only replicas out of the new set of replicas 2,3 for partition [test, 0] to be reassigned are alive. Failing partition reassignment
          at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.liftedTree1$1(KafkaController.scala:512)
          at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:495)
          at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:489)
          at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
          at kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:489)
          at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:547)
          at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
          [2012-10-08 21:30:06,280] ERROR [PartitionsReassignedListener on 0]: Error completing reassignment of partition [test, 0] (kafka.controller.PartitionsReassignedListener:102)
          org.I0Itec.zkclient.exception.ZkInterruptedException: java.lang.InterruptedException
          at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687)
          at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
          at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
          at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:363)
          at kafka.utils.ZkUtils$.getLeaderAndIsrForPartition(ZkUtils.scala:78)
          at kafka.controller.KafkaController.areReplicasInIsr(KafkaController.scala:323)
          at kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:183)
          at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.liftedTree1$1(KafkaController.scala:509)
          at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:495)
          at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:489)
          at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
          at kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:489)
          at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:547)
          at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
          Caused by: java.lang.InterruptedException
          at java.lang.Object.wait(Native Method)
          at java.lang.Object.wait(Object.java:485)
          at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344)
          at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:925)
          at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
          at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
          at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
          at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
          at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
          ... 13 more
          [2012-10-08 21:30:06,377] ERROR [Replica state machine on Controller 3]: Error while changing state of replica 2 for partition [test, 0] to OnlineReplica (kafka.controller.ReplicaStateMachine:102)
          java.lang.AssertionError: assertion failed: Replica 2 for partition [test, 0] should be in the NewReplica,OnlineReplica,OfflineReplica states before moving to OnlineReplica state. Instead it is in NonExistentReplica state
          at scala.Predef$.assert(Predef.scala:91)
          at kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194)
          at kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:130)
          at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
          at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
          at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
          at scala.collection.immutable.List.foreach(List.scala:45)
          at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86)
          at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply$mcVI$sp(KafkaController.scala:187)
          at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186)
          at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186)
          at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
          at scala.collection.immutable.List.foreach(List.scala:45)
          at kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:186)
          at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300)
          at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299)
          at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
          at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
          at scala.collection.Iterator$class.foreach(Iterator.scala:631)
          at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
          at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
          at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
          at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
          at kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299)
          at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284)
          at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79)
          at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52)
          at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55)
          at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94)
          at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
          at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
          [2012-10-08 21:30:06,379] ERROR [Replica state machine on Controller 3]: Error while changing state of replica 3 for partition [test, 0] to OnlineReplica (kafka.controller.ReplicaStateMachine:102)
          java.lang.AssertionError: assertion failed: Replica 3 for partition [test, 0] should be in the NewReplica,OnlineReplica,OfflineReplica states before moving to OnlineReplica state. Instead it is in NonExistentReplica state
          at scala.Predef$.assert(Predef.scala:91)
          at kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194)
          at kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:130)
          at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
          at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
          at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
          at scala.collection.immutable.List.foreach(List.scala:45)
          at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86)
          at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply$mcVI$sp(KafkaController.scala:187)
          at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186)
          at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186)
          at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
          at scala.collection.immutable.List.foreach(List.scala:45)
          at kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:186)
          at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300)
          at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299)
          at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
          at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
          at scala.collection.Iterator$class.foreach(Iterator.scala:631)
          at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
          at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
          at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
          at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
          at kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299)
          at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284)
          at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79)
          at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52)
          at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55)
          at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94)
          at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
          at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
          [2012-10-08 21:30:06,381] ERROR [Replica state machine on Controller 3]: Error while changing state of replica 0 for partition [test, 0] to OfflineReplica (kafka.controller.ReplicaStateMachine:102)
          java.lang.AssertionError: assertion failed: Replica 0 for partition [test, 0] should be in the NewReplica,OnlineReplica states before moving to OfflineReplica state. Instead it is in NonExistentReplica state
          at scala.Predef$.assert(Predef.scala:91)
          at kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194)
          at kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:156)
          at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
          at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
          at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
          at scala.collection.immutable.List.foreach(List.scala:45)
          at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86)
          at kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply$mcVI$sp(KafkaController.scala:363)
          at kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply(KafkaController.scala:362)
          at kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply(KafkaController.scala:362)
          at scala.collection.immutable.Set$Set2.foreach(Set.scala:101)
          at kafka.controller.KafkaController.stopOldReplicasOfReassignedPartition(KafkaController.scala:362)
          at kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:193)
          at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300)
          at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299)
          at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
          at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
          at scala.collection.Iterator$class.foreach(Iterator.scala:631)
          at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
          at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
          at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
          at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
          at kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299)
          at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284)
          at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79)
          at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52)
          at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55)
          at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94)
          at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
          at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

          Partition should have been reassigned to 0, 1 expected:<List(2, 3)> but was:<List(0, 1)>
          junit.framework.AssertionFailedError: Partition should have been reassigned to 0, 1 expected:<List(2, 3)> but was:<List(0, 1)>
          at junit.framework.Assert.fail(Assert.java:47)
          at junit.framework.Assert.failNotEquals(Assert.java:277)
          at junit.framework.Assert.assertEquals(Assert.java:64)
          at kafka.admin.AdminTest.testResumePartitionReassignmentAfterLeaderWasMoved(AdminTest.scala:361)
          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
          at java.lang.reflect.Method.invoke(Method.java:597)
          at junit.framework.TestCase.runTest(TestCase.java:164)

          Show
          Jun Rao added a comment - Thanks for patch v4. AdminTest.testResumePartitionReassignmentAfterLeaderWasMoved seems to fail. [2012-10-08 21:30:06,005] ERROR [PartitionsReassignedListener on 0] : Error completing reassignment of partition [test, 0] (kafka.controller.PartitionsReassignedListener:102) kafka.common.KafkaException: Only replicas out of the new set of replicas 2,3 for partition [test, 0] to be reassigned are alive. Failing partition reassignment at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.liftedTree1$1(KafkaController.scala:512) at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:495) at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:489) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:489) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:547) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2012-10-08 21:30:06,280] ERROR [PartitionsReassignedListener on 0] : Error completing reassignment of partition [test, 0] (kafka.controller.PartitionsReassignedListener:102) org.I0Itec.zkclient.exception.ZkInterruptedException: java.lang.InterruptedException at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:363) at kafka.utils.ZkUtils$.getLeaderAndIsrForPartition(ZkUtils.scala:78) at kafka.controller.KafkaController.areReplicasInIsr(KafkaController.scala:323) at kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:183) at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.liftedTree1$1(KafkaController.scala:509) at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:495) at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:489) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:489) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:547) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Caused by: java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:925) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956) at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) ... 13 more [2012-10-08 21:30:06,377] ERROR [Replica state machine on Controller 3] : Error while changing state of replica 2 for partition [test, 0] to OnlineReplica (kafka.controller.ReplicaStateMachine:102) java.lang.AssertionError: assertion failed: Replica 2 for partition [test, 0] should be in the NewReplica,OnlineReplica,OfflineReplica states before moving to OnlineReplica state. Instead it is in NonExistentReplica state at scala.Predef$.assert(Predef.scala:91) at kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194) at kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:130) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86) at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply$mcVI$sp(KafkaController.scala:187) at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186) at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:186) at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300) at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299) at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79) at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52) at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2012-10-08 21:30:06,379] ERROR [Replica state machine on Controller 3] : Error while changing state of replica 3 for partition [test, 0] to OnlineReplica (kafka.controller.ReplicaStateMachine:102) java.lang.AssertionError: assertion failed: Replica 3 for partition [test, 0] should be in the NewReplica,OnlineReplica,OfflineReplica states before moving to OnlineReplica state. Instead it is in NonExistentReplica state at scala.Predef$.assert(Predef.scala:91) at kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194) at kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:130) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86) at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply$mcVI$sp(KafkaController.scala:187) at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186) at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:186) at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300) at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299) at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79) at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52) at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2012-10-08 21:30:06,381] ERROR [Replica state machine on Controller 3] : Error while changing state of replica 0 for partition [test, 0] to OfflineReplica (kafka.controller.ReplicaStateMachine:102) java.lang.AssertionError: assertion failed: Replica 0 for partition [test, 0] should be in the NewReplica,OnlineReplica states before moving to OfflineReplica state. Instead it is in NonExistentReplica state at scala.Predef$.assert(Predef.scala:91) at kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194) at kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:156) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86) at kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply$mcVI$sp(KafkaController.scala:363) at kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply(KafkaController.scala:362) at kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply(KafkaController.scala:362) at scala.collection.immutable.Set$Set2.foreach(Set.scala:101) at kafka.controller.KafkaController.stopOldReplicasOfReassignedPartition(KafkaController.scala:362) at kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:193) at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300) at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299) at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79) at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52) at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Partition should have been reassigned to 0, 1 expected:<List(2, 3)> but was:<List(0, 1)> junit.framework.AssertionFailedError: Partition should have been reassigned to 0, 1 expected:<List(2, 3)> but was:<List(0, 1)> at junit.framework.Assert.fail(Assert.java:47) at junit.framework.Assert.failNotEquals(Assert.java:277) at junit.framework.Assert.assertEquals(Assert.java:64) at kafka.admin.AdminTest.testResumePartitionReassignmentAfterLeaderWasMoved(AdminTest.scala:361) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at junit.framework.TestCase.runTest(TestCase.java:164)
          Hide
          Jun Rao added a comment -

          Also, could you make the two new scripts in bin/ executable?

          Show
          Jun Rao added a comment - Also, could you make the two new scripts in bin/ executable?
          Hide
          Neha Narkhede added a comment -

          The intermittent test failure is due to the partition reassignment failing to complete due to ZkInterruptedException. This is probably due to the test trying to introduce a controller failover. But controller failover takes some time to restart the partition reassignment and the test failed due to a lower value of the timeout. I fixed the comment in the test assertion and increased the wait time. Now all test seem to pass after couple of iterations

          Show
          Neha Narkhede added a comment - The intermittent test failure is due to the partition reassignment failing to complete due to ZkInterruptedException. This is probably due to the test trying to introduce a controller failover. But controller failover takes some time to restart the partition reassignment and the test failed due to a lower value of the timeout. I fixed the comment in the test assertion and increased the wait time. Now all test seem to pass after couple of iterations
          Hide
          Neha Narkhede added a comment -

          Also, I will make the scripts executable before checking in

          Show
          Neha Narkhede added a comment - Also, I will make the scripts executable before checking in
          Hide
          Neha Narkhede added a comment -

          This is blocking work on KAFKA-43, if no one has objections on v5, I will commit this today

          Show
          Neha Narkhede added a comment - This is blocking work on KAFKA-43 , if no one has objections on v5, I will commit this today
          Hide
          Jun Rao added a comment -

          +1 on the patch. Still see AdminTest.testResumePartitionReassignmentAfterLeaderWasMoved fails occasionally due to the same error. However, it seems to be less frequent now.

          Unfortunately, you will need to rebase. The patch can be committed after the rebase.

          Show
          Jun Rao added a comment - +1 on the patch. Still see AdminTest.testResumePartitionReassignmentAfterLeaderWasMoved fails occasionally due to the same error. However, it seems to be less frequent now. Unfortunately, you will need to rebase. The patch can be committed after the rebase.
          Hide
          Neha Narkhede added a comment -

          I fear I might end up rebasing incorrectly and volunteer for applying KAFKA-510 top of this rather than other way around. This is going to require me to revert KAFKA-510, apply KAFKA-42 and then re-apply KAFKA-510.

          Show
          Neha Narkhede added a comment - I fear I might end up rebasing incorrectly and volunteer for applying KAFKA-510 top of this rather than other way around. This is going to require me to revert KAFKA-510 , apply KAFKA-42 and then re-apply KAFKA-510 .
          Hide
          Neha Narkhede added a comment -

          Committed this

          Show
          Neha Narkhede added a comment - Committed this
          Hide
          sungjubong added a comment -

          Created reviewboard against branch origin/trunk

          Show
          sungjubong added a comment - Created reviewboard against branch origin/trunk
          Hide
          Ivan Lyutov added a comment -

          Updated reviewboard against branch apache/0.8.1

          Show
          Ivan Lyutov added a comment - Updated reviewboard against branch apache/0.8.1
          Hide
          Jun Rao added a comment -

          This jira is already closed. Is the patch for this jira?

          Show
          Jun Rao added a comment - This jira is already closed. Is the patch for this jira?
          Hide
          Guozhang Wang added a comment -

          I think this is just due to the review-tool, which use the magic number "42" when no jira number is specified.

          Show
          Guozhang Wang added a comment - I think this is just due to the review-tool, which use the magic number "42" when no jira number is specified.
          Hide
          Parth Brahmbhatt added a comment -

          Updated reviewboard https://reviews.apache.org/r/29468/diff/
          against branch origin/trunk

          Show
          Parth Brahmbhatt added a comment - Updated reviewboard https://reviews.apache.org/r/29468/diff/ against branch origin/trunk

            People

            • Assignee:
              Neha Narkhede
              Reporter:
              Jun Rao
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - 240h
                240h
                Remaining:
                Remaining Estimate - 240h
                240h
                Logged:
                Time Spent - Not Specified
                Not Specified

                  Development