Kafka
  1. Kafka
  2. KAFKA-498

Controller code has race conditions and synchronization bugs

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: None
    • Labels:

      Description

      The controller maintains some internal data structures that are updated by state changes triggered by zookeeper listeners. There are race conditions in the controller channel manager and the controller state machine.

      1. kafka-498-v2.patch
        101 kB
        Neha Narkhede
      2. kafka-498-v2.patch
        101 kB
        Neha Narkhede
      3. kafka-498-v1.patch
        99 kB
        Neha Narkhede

        Issue Links

          Activity

          Hide
          Jun Rao added a comment -

          +1 on the latest patch.

          Show
          Jun Rao added a comment - +1 on the latest patch.
          Hide
          Neha Narkhede added a comment -

          Thanks for the review !

          1. Good point. Included Broker in ControllerBrokerStateInfo
          2.1 brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1)) looks simpler
          2.2 Al though it will be slightly inefficient to do that, it probably is easier to read
          3.1 This API is pretty badly named and designed. It will be refactored in KAFKA-499, but changed the name to what you
          suggested for now.
          3.2 Not sure it is very useful in the context of the controller's state machine. But right now, it is very unclear as
          to what the state machine is and which transitions cause what state changes. I think I will have a better handle on this in KAFKA-499.
          4. Synchronized the addBroker public API on the brokerLock
          5. Synchronized the sendRequest public API on the brokerLock

          Show
          Neha Narkhede added a comment - Thanks for the review ! 1. Good point. Included Broker in ControllerBrokerStateInfo 2.1 brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1)) looks simpler 2.2 Al though it will be slightly inefficient to do that, it probably is easier to read 3.1 This API is pretty badly named and designed. It will be refactored in KAFKA-499 , but changed the name to what you suggested for now. 3.2 Not sure it is very useful in the context of the controller's state machine. But right now, it is very unclear as to what the state machine is and which transitions cause what state changes. I think I will have a better handle on this in KAFKA-499 . 4. Synchronized the addBroker public API on the brokerLock 5. Synchronized the sendRequest public API on the brokerLock
          Hide
          Jun Rao added a comment -

          Thanks for the patch. Overall, a good cleanup patch. Some minor comments:

          1. ControllerBrokerStateInfo: Should we include Broker in this class too?

          2. ControllerChannelManager:
          2.1 brokers.foreach(broker => brokerStateInfo(broker._1).requestSendThread.start()) can be done as
          brokers.foreach

          { case(brokerId, _) => brokerStateInfo(brokerId).requestSendThread.start() }

          2.2 startup: Should call startRequestSendThread()

          3. readLeaderAndIsrFromZookeeper:
          3.1 This method also sends leaderAndISR requests to brokers, in addition to reading from ZK. Maybe we can call it readAndSendLeaderAndIsrFromZookeeper?
          3.2 If no replicas are assigned to a partition, is it necessary to log the assignment for all partitions? Ditto for onBrokerChange

          Show
          Jun Rao added a comment - Thanks for the patch. Overall, a good cleanup patch. Some minor comments: 1. ControllerBrokerStateInfo: Should we include Broker in this class too? 2. ControllerChannelManager: 2.1 brokers.foreach(broker => brokerStateInfo(broker._1).requestSendThread.start()) can be done as brokers.foreach { case(brokerId, _) => brokerStateInfo(brokerId).requestSendThread.start() } 2.2 startup: Should call startRequestSendThread() 3. readLeaderAndIsrFromZookeeper: 3.1 This method also sends leaderAndISR requests to brokers, in addition to reading from ZK. Maybe we can call it readAndSendLeaderAndIsrFromZookeeper? 3.2 If no replicas are assigned to a partition, is it necessary to log the assignment for all partitions? Ditto for onBrokerChange
          Hide
          Neha Narkhede added a comment -

          Deleted an unused file, fixed minor typo

          Show
          Neha Narkhede added a comment - Deleted an unused file, fixed minor typo
          Hide
          Neha Narkhede added a comment -

          1. Refactoring of current procedural controller code to functional style
          1.1. Using switch case statements instead of if-else
          1.2. Avoid explicit return statements from try-catch blocks
          1.3. Use Option instead of null. This is very important, there were several places that handled only the non-null case, basically making way for NPE's in error cases.
          1.4. Rename variables in all capital letters to camel case
          1.5. Fixed logging since a lot of log statements were at info and were somewhat unclear
          1.6. Fixed indentation, some places uses 4 spaces, others used 2 spaces.
          1.7. ZkUtils.scala
          1.7.1 Refactored readDataMaybeNull to return an Option instead of null. This allows all usages of that API that query an ephemeral path to handle the case when the value in zookeeper does not exist anymore
          1.7.2. Refactored getBrokerInfoFromIds to handle only a single broker id. The reason is that most usages of that API used it to query for a single broker id.
          1.8 Many places in the code did not wrap the lines correctly. Fixed this as much as I could.

          2. KafkaController.scala
          2.1/ Renamed deliverLeaderAndISRFromZookeeper to readLeaderAndIsrFromZookeeper
          2.2 There is a race condition in KafkaController where it doesn't synchronize access to the controller's data structures while creating a new session. Basically, controllerRegisterOrFailover is a private API that modifies almost all the internal controller data structures that require synchronization. Since this API is not synchronizing on the controller lock, all usages of this API need to do this correctly. Fixed handleNewSession to synchronize the controllerRegisterOrFailover API, since that can be concurrently executed with the startup procedure.
          2.3 In onBrokerChange, defaulting to empty list instead of null. This lets us avoid null checks
          2.4 Refactored onBrokerChange() API and moved the leader election logic to a separate API. After starting on this path, I figured that this code is going to need a major refactoring that I'd like to fix in a separate patch. Filed KAFKA-499 to cover that. For now, we can keep this although it will look complete only after KAFKA-499 is in.
          2.5 onBrokerChange() and initLeaders() APIs are very similar and duplicate quite a lot of code. I refactored onBrokerChange() but realized later that initLeaders would have to refactored too. To keep the changes in this patch small enough, I will fix this as part of KAFKA-499.

          3. ControllerChannelManager & KafkaController
          3.1 There are 3 types of information that the controller maintains per broker - request thread, message queue and socket channel. Currently, they are maintained in 3 separate variables and we need to ensure all 3 are synchronized correctly. To simplify this, I created a case class to wrap this state in one object.
          3.2 There is a lock object whose purpose is to synchronize access to the broker cache. Currently, the lock doesn't seem to protect all access to these caches, which looks like a synchronization bug. Fixed this to have startup API synchronize access to the broker cache.
          3.3. The addBroker logic was duplicated in 2 places. Refactored the constructors to add an auxilary constructor to call the addBroker API that handles creating controller state info for a new broker and the appropriate synchronization. Currently, the constructor duplicates code to add a new broker, probably since it is not right to invoke an API from inside the primary constructor
          3.4 There are 2 ways to remove a broker from the controller's broker cache - during shutdown of the channel manager or when a broker change listener fires. Since removeBroker is a public API, it is synchronized using the brokerLock. The shutdown API calls removeBroker internally and ends up acquiring and releasing the lock multiple times. Ideally, it is sufficient to acquire the brokerLock just once for the entire shutdown API. Refactored to move the common removeBroker logic to a private API that doesn't synchronize on the brokerLock. Changed removeBroker to acquire lock and call removeExistingBroker, changed shutdown to acquire lock once and call removeExistingBroker.

          4. Renamed LeaderAndISR to LeaderAndIsr

          5. Renamed BrokerNotExistException to BrokerNotAvailableException to remain consistent with other exceptions of the same type (LeaderNotAvailableException, ReplicaNotAvailableException)

          NOTE: Apache svn seems to hang at the time of uploading this patch. It will apply cleanly on revision 1380945

          Show
          Neha Narkhede added a comment - 1. Refactoring of current procedural controller code to functional style 1.1. Using switch case statements instead of if-else 1.2. Avoid explicit return statements from try-catch blocks 1.3. Use Option instead of null. This is very important, there were several places that handled only the non-null case, basically making way for NPE's in error cases. 1.4. Rename variables in all capital letters to camel case 1.5. Fixed logging since a lot of log statements were at info and were somewhat unclear 1.6. Fixed indentation, some places uses 4 spaces, others used 2 spaces. 1.7. ZkUtils.scala 1.7.1 Refactored readDataMaybeNull to return an Option instead of null. This allows all usages of that API that query an ephemeral path to handle the case when the value in zookeeper does not exist anymore 1.7.2. Refactored getBrokerInfoFromIds to handle only a single broker id. The reason is that most usages of that API used it to query for a single broker id. 1.8 Many places in the code did not wrap the lines correctly. Fixed this as much as I could. 2. KafkaController.scala 2.1/ Renamed deliverLeaderAndISRFromZookeeper to readLeaderAndIsrFromZookeeper 2.2 There is a race condition in KafkaController where it doesn't synchronize access to the controller's data structures while creating a new session. Basically, controllerRegisterOrFailover is a private API that modifies almost all the internal controller data structures that require synchronization. Since this API is not synchronizing on the controller lock, all usages of this API need to do this correctly. Fixed handleNewSession to synchronize the controllerRegisterOrFailover API, since that can be concurrently executed with the startup procedure. 2.3 In onBrokerChange, defaulting to empty list instead of null. This lets us avoid null checks 2.4 Refactored onBrokerChange() API and moved the leader election logic to a separate API. After starting on this path, I figured that this code is going to need a major refactoring that I'd like to fix in a separate patch. Filed KAFKA-499 to cover that. For now, we can keep this although it will look complete only after KAFKA-499 is in. 2.5 onBrokerChange() and initLeaders() APIs are very similar and duplicate quite a lot of code. I refactored onBrokerChange() but realized later that initLeaders would have to refactored too. To keep the changes in this patch small enough, I will fix this as part of KAFKA-499 . 3. ControllerChannelManager & KafkaController 3.1 There are 3 types of information that the controller maintains per broker - request thread, message queue and socket channel. Currently, they are maintained in 3 separate variables and we need to ensure all 3 are synchronized correctly. To simplify this, I created a case class to wrap this state in one object. 3.2 There is a lock object whose purpose is to synchronize access to the broker cache. Currently, the lock doesn't seem to protect all access to these caches, which looks like a synchronization bug. Fixed this to have startup API synchronize access to the broker cache. 3.3. The addBroker logic was duplicated in 2 places. Refactored the constructors to add an auxilary constructor to call the addBroker API that handles creating controller state info for a new broker and the appropriate synchronization. Currently, the constructor duplicates code to add a new broker, probably since it is not right to invoke an API from inside the primary constructor 3.4 There are 2 ways to remove a broker from the controller's broker cache - during shutdown of the channel manager or when a broker change listener fires. Since removeBroker is a public API, it is synchronized using the brokerLock. The shutdown API calls removeBroker internally and ends up acquiring and releasing the lock multiple times. Ideally, it is sufficient to acquire the brokerLock just once for the entire shutdown API. Refactored to move the common removeBroker logic to a private API that doesn't synchronize on the brokerLock. Changed removeBroker to acquire lock and call removeExistingBroker, changed shutdown to acquire lock once and call removeExistingBroker. 4. Renamed LeaderAndISR to LeaderAndIsr 5. Renamed BrokerNotExistException to BrokerNotAvailableException to remain consistent with other exceptions of the same type (LeaderNotAvailableException, ReplicaNotAvailableException) NOTE: Apache svn seems to hang at the time of uploading this patch. It will apply cleanly on revision 1380945

            People

            • Assignee:
              Neha Narkhede
              Reporter:
              Neha Narkhede
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - 24h
                24h
                Remaining:
                Remaining Estimate - 24h
                24h
                Logged:
                Time Spent - Not Specified
                Not Specified

                  Development