Kafka
  1. Kafka
  2. KAFKA-911

Bug in controlled shutdown logic in controller leads to controller not sending out some state change request

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: controller
    • Labels:

      Description

      The controlled shutdown logic in the controller first tries to move the leaders from the broker being shutdown. Then it tries to remove the broker from the isr list. During that operation, it does not synchronize on the controllerLock. This causes a race condition while dispatching data using the controller's channel manager.

      1. kafka-911-v2.patch
        7 kB
        Neha Narkhede
      2. kafka-911-v1.patch
        6 kB
        Neha Narkhede

        Activity

        Sriram Subramanian made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Hide
        Sriram Subramanian added a comment -

        This has been fixed.

        Show
        Sriram Subramanian added a comment - This has been fixed.
        Hide
        Sriram Subramanian added a comment -

        I suggest we wait for my patch. My patch changes quite a bit of this logic and it just adds to the merge problem.

        Show
        Sriram Subramanian added a comment - I suggest we wait for my patch. My patch changes quite a bit of this logic and it just adds to the merge problem.
        Neha Narkhede made changes -
        Attachment kafka-911-v2.patch [ 12585046 ]
        Hide
        Neha Narkhede added a comment -

        I agree with Joel's suggestion. Removing the shutting down brokers from the ISR is better. This patch sends the LeaderAndIsrRequest with the reduced isr to the new leader for the partitions on the shutting down brokers. This ensures the leader will remove the shutting down broker from the isr in zookeeper. This also makes it unnecessary for the shrunk isr zookeeper write to happen during the controlled shutdown on the controller.

        Show
        Neha Narkhede added a comment - I agree with Joel's suggestion. Removing the shutting down brokers from the ISR is better. This patch sends the LeaderAndIsrRequest with the reduced isr to the new leader for the partitions on the shutting down brokers. This ensures the leader will remove the shutting down broker from the isr in zookeeper. This also makes it unnecessary for the shrunk isr zookeeper write to happen during the controlled shutdown on the controller.
        Hide
        Joel Koshy added a comment -

        I had to revisit the notes from KAFKA-340. I think this was touched upon. i.e., the fact that the current implementation's attempt to shrink ISR may be ineffective for partitions whose leadership has been moved from the current broker - https://issues.apache.org/jira/browse/KAFKA-340?focusedCommentId=13483478&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13483478

        <quote>
        > 3.4 What is the point of sending leader and isr request at the end of shutdownBroker, since the OfflineReplica state
        > change would've taken care of that anyway. It seems like you just need to send the stop replica request with the delete
        > partitions flag turned off, no ?

        I still need (as an optimization) to send the leader and isr request to the leaders of all partitions that are present
        on the shutting down broker so it can remove the shutting down broker from its inSyncReplicas cache
        (in Partition.scala) so it no longer waits for acks from the shutting down broker if a producer request's num-acks is
        set to -1. Otherwise, we have to wait for the leader to "organically" shrink the ISR.

        This also applies to partitions which are moved (i.e., partitions for which the shutting down broker was the leader):
        the ControlledShutdownLeaderSelector needs to send the updated leaderAndIsr request to the shutting down broker as well
        (to tell it that it is no longer the leader) at which point it will start up a replica fetcher and re-enter the ISR.
        So in fact, there is actually not much point in removing the "current leader" from the ISR in the
        ControlledShutdownLeaderSelector.selectLeader.
        </quote>

        and

        https://issues.apache.org/jira/browse/KAFKA-340?focusedCommentId=13484727&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13484727
        (I don't think I actually filed that jira though.)

        Show
        Joel Koshy added a comment - I had to revisit the notes from KAFKA-340 . I think this was touched upon. i.e., the fact that the current implementation's attempt to shrink ISR may be ineffective for partitions whose leadership has been moved from the current broker - https://issues.apache.org/jira/browse/KAFKA-340?focusedCommentId=13483478&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13483478 <quote> > 3.4 What is the point of sending leader and isr request at the end of shutdownBroker, since the OfflineReplica state > change would've taken care of that anyway. It seems like you just need to send the stop replica request with the delete > partitions flag turned off, no ? I still need (as an optimization) to send the leader and isr request to the leaders of all partitions that are present on the shutting down broker so it can remove the shutting down broker from its inSyncReplicas cache (in Partition.scala) so it no longer waits for acks from the shutting down broker if a producer request's num-acks is set to -1. Otherwise, we have to wait for the leader to "organically" shrink the ISR. This also applies to partitions which are moved (i.e., partitions for which the shutting down broker was the leader): the ControlledShutdownLeaderSelector needs to send the updated leaderAndIsr request to the shutting down broker as well (to tell it that it is no longer the leader) at which point it will start up a replica fetcher and re-enter the ISR. So in fact, there is actually not much point in removing the "current leader" from the ISR in the ControlledShutdownLeaderSelector.selectLeader. </quote> and https://issues.apache.org/jira/browse/KAFKA-340?focusedCommentId=13484727&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13484727 (I don't think I actually filed that jira though.)
        Hide
        Neha Narkhede added a comment -

        You are right that we can send the reduced ISR request to the leader, but that is independent of removing the shutting down broker from the ISR in zookeeper. I'm arguing that the zookeeper write is unnecessary. To handle the issue you described, we can send a leader and isr request just to the leader with the reduced isr.

        Show
        Neha Narkhede added a comment - You are right that we can send the reduced ISR request to the leader, but that is independent of removing the shutting down broker from the ISR in zookeeper. I'm arguing that the zookeeper write is unnecessary. To handle the issue you described, we can send a leader and isr request just to the leader with the reduced isr.
        Hide
        Jun Rao added a comment -

        If we just stop the replica to be shut down without sending a reduced ISR to the leader, it will take replicaLagTimeMaxMs (defaults to 10s) before the leader realize that the follower is gone. Before that, no new messages can be committed. The idea of letting the controller send a reduced ISR to the leader is to allow the leader to commit new messages sooner. Not very sure if the existing logic does this effectively though. It seems to me that it's better if we stop the shutdown replica one at a time after the leader is moved. Maybe Joel can comment?

        Show
        Jun Rao added a comment - If we just stop the replica to be shut down without sending a reduced ISR to the leader, it will take replicaLagTimeMaxMs (defaults to 10s) before the leader realize that the follower is gone. Before that, no new messages can be committed. The idea of letting the controller send a reduced ISR to the leader is to allow the leader to commit new messages sooner. Not very sure if the existing logic does this effectively though. It seems to me that it's better if we stop the shutdown replica one at a time after the leader is moved. Maybe Joel can comment?
        Hide
        Neha Narkhede added a comment -

        testShutdownBroker() testcase in AdminTest will fail with this patch since it assumes that the controlled shutdown logic will shrink the ISR proactively. I will fix the test if the changes in this patch of not shrinking the ISR are acceptable.

        Show
        Neha Narkhede added a comment - testShutdownBroker() testcase in AdminTest will fail with this patch since it assumes that the controlled shutdown logic will shrink the ISR proactively. I will fix the test if the changes in this patch of not shrinking the ISR are acceptable.
        Neha Narkhede made changes -
        Status In Progress [ 3 ] Patch Available [ 10002 ]
        Neha Narkhede made changes -
        Attachment kafka-911-v1.patch [ 12584342 ]
        Hide
        Neha Narkhede added a comment -

        The root cause of this bug was a race condition while using the ControllerBrokerRequestBatch that assumes synchronization at the caller. This patch synchronizes access to the ControllerBrokerRequestBatch while sending out the StopReplicaRequest.

        While working on this and testing the patch, I noticed some inefficiency with the controller shutdown API. When we move the leader for a partition one at a time, we also remove the broker being shutdown from the ISR. This involves at least one read and one write per partition to zookeeper, sometimes more. Besides being slow, this operation is not effective. This is because the broker being shutdown still has alive fetchers to the new leader before it receives and acts on the StopReplicaRequest. So the new leader adds it back to the ISR anyways.

        Then I thought I could move it to the end of the controlled shutdown API where we check if all partitions are successfully moved, then send StopReplicaRequest to the broker being shut down. Right after this, we could move the replica to Offline and remove it from ISR as part of that operation. Even if we can invoke this operation as a batch, it will still be slow since the zookeeper reads/writes will happen serially. Also, I realized this is not required as well for 2 reasons -

        1. Since the shutting broker is sent the StopReplicaRequest, it will stop the fetcher and fall out of ISR. Until then, as long as the controller doesn't failover, it will not be elected as the leader, even if it is in the ISR, since it is one of the shuttingDownBrokers.

        2. Even if the controller fails over, by the time the new controller has initialized and is ready to serve, the StopReplicaRequest would've ensured that the shutdown broker is no longer in the ISR. And until the controller fails over, there cannot be any leader election anyway.

        I've tested this patch on a 7 node cluster that continuously gets rolling bounced and has ~100 producers sending production data to it with ~1000 consumers consuming data from it.

        Show
        Neha Narkhede added a comment - The root cause of this bug was a race condition while using the ControllerBrokerRequestBatch that assumes synchronization at the caller. This patch synchronizes access to the ControllerBrokerRequestBatch while sending out the StopReplicaRequest. While working on this and testing the patch, I noticed some inefficiency with the controller shutdown API. When we move the leader for a partition one at a time, we also remove the broker being shutdown from the ISR. This involves at least one read and one write per partition to zookeeper, sometimes more. Besides being slow, this operation is not effective. This is because the broker being shutdown still has alive fetchers to the new leader before it receives and acts on the StopReplicaRequest. So the new leader adds it back to the ISR anyways. Then I thought I could move it to the end of the controlled shutdown API where we check if all partitions are successfully moved, then send StopReplicaRequest to the broker being shut down. Right after this, we could move the replica to Offline and remove it from ISR as part of that operation. Even if we can invoke this operation as a batch, it will still be slow since the zookeeper reads/writes will happen serially. Also, I realized this is not required as well for 2 reasons - 1. Since the shutting broker is sent the StopReplicaRequest, it will stop the fetcher and fall out of ISR. Until then, as long as the controller doesn't failover, it will not be elected as the leader, even if it is in the ISR, since it is one of the shuttingDownBrokers. 2. Even if the controller fails over, by the time the new controller has initialized and is ready to serve, the StopReplicaRequest would've ensured that the shutdown broker is no longer in the ISR. And until the controller fails over, there cannot be any leader election anyway. I've tested this patch on a 7 node cluster that continuously gets rolling bounced and has ~100 producers sending production data to it with ~1000 consumers consuming data from it.
        Neha Narkhede made changes -
        Field Original Value New Value
        Status Open [ 1 ] In Progress [ 3 ]
        Neha Narkhede created issue -

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development