Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1305

Controller can hang on controlled shutdown with auto leader balance enabled

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.8.2.0
    • Component/s: None
    • Labels:
      None

      Description

      This is relatively easy to reproduce especially when doing a rolling bounce.
      What happened here is as follows:

      1. The previous controller was bounced and broker 265 became the new controller.
      2. I went on to do a controlled shutdown of broker 265 (the new controller).
      3. In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). (t@113 below).
      4. While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server.
      5. (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to. However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.)
      6. So the request thread to broker 265 gets into infinite retries.
      7. The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock).

      Relevant portions from the thread-dump:

      "Controller-265-to-broker-265-send-thread" - Thread t@113
      java.lang.Thread.State: TIMED_WAITING
      at java.lang.Thread.sleep(Native Method)
      at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
      at kafka.utils.Utils$.swallow(Utils.scala:167)
      at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
      at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
      at kafka.utils.Logging$class.swallow(Logging.scala:94)
      at kafka.utils.Utils$.swallow(Utils.scala:46)
      at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
      at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)

      • locked java.lang.Object@6dbf14a7
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

      Locked ownable synchronizers:

      • None

      ...

      "Thread-4" - Thread t@17
      java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0
      at sun.misc.Unsafe.park(Native Method)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
      at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
      at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
      at kafka.utils.Utils$.inLock(Utils.scala:536)
      at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
      at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
      at kafka.utils.Utils$.swallow(Utils.scala:167)
      at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
      at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
      at kafka.utils.Logging$class.swallow(Logging.scala:94)
      at kafka.utils.Utils$.swallow(Utils.scala:46)
      at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
      at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
      at kafka.Kafka$$anon$1.run(Kafka.scala:42)

      ...

      "kafka-scheduler-0" - Thread t@117
      java.lang.Thread.State: WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
      at sun.misc.Unsafe.park(Native Method)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
      at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
      at kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)

      • locked java.lang.Object@578b748f
        at kafka.controller.KafkaController.sendRequest(KafkaController.scala:657)
        at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
        at kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282)
        at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126)
        at kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612)
        at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119)
        at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
        at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
        at kafka.utils.Utils$.inLock(Utils.scala:538)
        at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111)
        at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109)
        at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088)
        at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
        at kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088)
        at kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323)
        at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

      Locked ownable synchronizers:

      • locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840
      • locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530

      ...

      "ZkClient-EventThread-18-/kafka-shadow" - Thread t@18
      java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0
      at sun.misc.Unsafe.park(Native Method)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
      at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
      at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
      at kafka.utils.Utils$.inLock(Utils.scala:536)
      at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328)
      at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
      at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

      1. KAFKA-1305_2014-10-13_07:30:45.patch
        2 kB
        Sriharsha Chintalapani
      2. KAFKA-1305.patch
        7 kB
        Sriharsha Chintalapani
      3. KAFKA-1305.patch
        2 kB
        Sriharsha Chintalapani

        Issue Links

          Activity

          Hide
          dmitrybugaychenko Dmitry Bugaychenko added a comment -

          Even with a fast dedicated channel there will be a race condition in switching leadership. It could be removed either by complicating the protocol (eg. the new leader shoul take leadership only after getting "not a leader" respone in fetcher thread from the old one, while the old leader should stop handling produce request allowing fetches only from the new leader untill it gets everything), or, may be, it is worth to consider getting rid of controller in partition leader election and use distributed elections in ZK.

          Show
          dmitrybugaychenko Dmitry Bugaychenko added a comment - Even with a fast dedicated channel there will be a race condition in switching leadership. It could be removed either by complicating the protocol (eg. the new leader shoul take leadership only after getting "not a leader" respone in fetcher thread from the old one, while the old leader should stop handling produce request allowing fetches only from the new leader untill it gets everything), or, may be, it is worth to consider getting rid of controller in partition leader election and use distributed elections in ZK.
          Hide
          becket_qin Jiangjie Qin added a comment -

          I see. The risk of this approach is that controller or broker could potentially be in a inconsistent state. Because it is not necessarily the case that timeout occurs on broker shutdown. In that case, some controller to broker messages are sent while some might not.
          I think the key problem of current approach is that we mix the data plain and control plain, i.e. the controller message and user data are handled by same request handlers on Kafka server. So controller messages usually sitting in the queue behind many user requests. That could cause the handling of controller messages to delay for almost arbitrary time (the more leader a broker has, the worse the situation will be). The right solution is probably having a separate thread handling controller message or prioritize controller message handling. Giving priority to controller message probably has less change because we just need to insert the controller message to the head of the queue instead of the tail.

          Show
          becket_qin Jiangjie Qin added a comment - I see. The risk of this approach is that controller or broker could potentially be in a inconsistent state. Because it is not necessarily the case that timeout occurs on broker shutdown. In that case, some controller to broker messages are sent while some might not. I think the key problem of current approach is that we mix the data plain and control plain, i.e. the controller message and user data are handled by same request handlers on Kafka server. So controller messages usually sitting in the queue behind many user requests. That could cause the handling of controller messages to delay for almost arbitrary time (the more leader a broker has, the worse the situation will be). The right solution is probably having a separate thread handling controller message or prioritize controller message handling. Giving priority to controller message probably has less change because we just need to insert the controller message to the head of the queue instead of the tail.
          Hide
          dmitrybugaychenko Dmitry Bugaychenko added a comment -

          Retry itself is in the cahnnel manager independent of controller lock. Deadlock happens because one of the threads owning controller lock trying to put message to channel manager - it waits for free space but won't evere get it. With timeout it won't wait forever and eventually fail the operation given controller a chance to handle broker failure (which includes closing corresponding channel and emptying its queue).

          Show
          dmitrybugaychenko Dmitry Bugaychenko added a comment - Retry itself is in the cahnnel manager independent of controller lock. Deadlock happens because one of the threads owning controller lock trying to put message to channel manager - it waits for free space but won't evere get it. With timeout it won't wait forever and eventually fail the operation given controller a chance to handle broker failure (which includes closing corresponding channel and emptying its queue).
          Hide
          becket_qin Jiangjie Qin added a comment -

          It is not clear to me how adding timeout when put messages to broker queue in controller would help. This operation is done in the controller lock and have to be infinitely retry. I would guess in a parallel shutdown, you might still see deadlock.

          Show
          becket_qin Jiangjie Qin added a comment - It is not clear to me how adding timeout when put messages to broker queue in controller would help. This operation is done in the controller lock and have to be infinitely retry. I would guess in a parallel shutdown, you might still see deadlock.
          Hide
          dmitrybugaychenko Dmitry Bugaychenko added a comment -

          Yes, data loss is tolerated to some extend. With small queue it was about lossing less then a second or even none of data and were considered fine, but with extended queue it is about few minutes - using acks in this case will simply cause producers to crash, denie their service or drop messages (because for few minutes they basically can not produce). In the end we decided to reduce the queue to default and to apply three patches:

          1. Add throttling to prefered replica elections and controlled shuttdown leadership reasignement
          2. Add timeout for adding messages to queue in order to avoid locked controller
          3. Add separate timeout for sending controlled shutdown message - in our setup it takes about 10 minutes and this value is meaningless and dangerous for other kind of controller-to-broker communication

          Things seems to work and data are not lost, but shutdown and rebalance are slow. Instead of throttling it could be better to wait for previous leader movement to be completed by all the participatnts before moving to next one. It is also possible to do leader movements in batches (at least api seems to support that).

          Show
          dmitrybugaychenko Dmitry Bugaychenko added a comment - Yes, data loss is tolerated to some extend. With small queue it was about lossing less then a second or even none of data and were considered fine, but with extended queue it is about few minutes - using acks in this case will simply cause producers to crash, denie their service or drop messages (because for few minutes they basically can not produce). In the end we decided to reduce the queue to default and to apply three patches: Add throttling to prefered replica elections and controlled shuttdown leadership reasignement Add timeout for adding messages to queue in order to avoid locked controller Add separate timeout for sending controlled shutdown message - in our setup it takes about 10 minutes and this value is meaningless and dangerous for other kind of controller-to-broker communication Things seems to work and data are not lost, but shutdown and rebalance are slow. Instead of throttling it could be better to wait for previous leader movement to be completed by all the participatnts before moving to next one. It is also possible to do leader movements in batches (at least api seems to support that).
          Hide
          becket_qin Jiangjie Qin added a comment -

          That's a good point. But I kind of think the right solution to that problem does not lie in the queue size. Because there will be data loss in leader migration today, more or less. The amount is actually non-deterministic. So my understanding is either user can tolerate data loss or user needs to use acks=-1.

          Show
          becket_qin Jiangjie Qin added a comment - That's a good point. But I kind of think the right solution to that problem does not lie in the queue size. Because there will be data loss in leader migration today, more or less. The amount is actually non-deterministic. So my understanding is either user can tolerate data loss or user needs to use acks=-1.
          Hide
          dmitrybugaychenko Dmitry Bugaychenko added a comment -

          With a large controller queues size we see a significant datalos during prefered replica election for a brokers leading a lot of partitions (100+). The problem is that the prefered replica handles its requests fast, empties its request queue and stop following others, but the old leader hadles request much slower and it takse few minutes before it stop considering itself as a leader for all re-elected partitions. During these few minutes old leader continue to acknowledge produce requests and at the end it recognize its not longer a leader and truncates its logs deleting all data received...

          Show
          dmitrybugaychenko Dmitry Bugaychenko added a comment - With a large controller queues size we see a significant datalos during prefered replica election for a brokers leading a lot of partitions (100+). The problem is that the prefered replica handles its requests fast, empties its request queue and stop following others, but the old leader hadles request much slower and it takse few minutes before it stop considering itself as a leader for all re-elected partitions. During these few minutes old leader continue to acknowledge produce requests and at the end it recognize its not longer a leader and truncates its logs deleting all data received...
          Hide
          sriharsha Sriharsha Chintalapani added a comment -

          Jason Rosenberg The fix is in config. So if you are using 0.8.1.1 you can enable automatic preferred leader election but make sure you set controller.message.queue.size to Int.MaxValue by default this is set to 10 in 0.8.1.1.

          Show
          sriharsha Sriharsha Chintalapani added a comment - Jason Rosenberg The fix is in config. So if you are using 0.8.1.1 you can enable automatic preferred leader election but make sure you set controller.message.queue.size to Int.MaxValue by default this is set to 10 in 0.8.1.1.
          Hide
          jbrosenberg@gmail.com Jason Rosenberg added a comment -

          Is it safe to say then, if we are not yet on 0.8.2 (e.g. still on 0.8.1.1), we should not enable automatic preferred leader election?

          Show
          jbrosenberg@gmail.com Jason Rosenberg added a comment - Is it safe to say then, if we are not yet on 0.8.2 (e.g. still on 0.8.1.1), we should not enable automatic preferred leader election?
          Hide
          nehanarkhede Neha Narkhede added a comment -

          Sriharsha Chintalapani Latest patch looks good. Pushing it to trunk and 0.8.2

          Show
          nehanarkhede Neha Narkhede added a comment - Sriharsha Chintalapani Latest patch looks good. Pushing it to trunk and 0.8.2
          Hide
          sriharsha Sriharsha Chintalapani added a comment -

          Updated reviewboard https://reviews.apache.org/r/26633/diff/
          against branch origin/trunk

          Show
          sriharsha Sriharsha Chintalapani added a comment - Updated reviewboard https://reviews.apache.org/r/26633/diff/ against branch origin/trunk
          Hide
          junrao Jun Rao added a comment -

          Perhaps we can just set the default queue to max int for now. If we don't see any issue with this, we can make LinkedBlockingQueue to unbounded later. Could you also change the default value for auto.leader.rebalance.enable?

          Show
          junrao Jun Rao added a comment - Perhaps we can just set the default queue to max int for now. If we don't see any issue with this, we can make LinkedBlockingQueue to unbounded later. Could you also change the default value for auto.leader.rebalance.enable?
          Hide
          sriharsha Sriharsha Chintalapani added a comment -

          Created reviewboard https://reviews.apache.org/r/26633/diff/
          against branch origin/trunk

          Show
          sriharsha Sriharsha Chintalapani added a comment - Created reviewboard https://reviews.apache.org/r/26633/diff/ against branch origin/trunk
          Hide
          sriharsha Sriharsha Chintalapani added a comment -

          Neha Narkhede so the changes you are looking for are remove the config option and make the LinkedBlockingQueue to unbounded?

          Show
          sriharsha Sriharsha Chintalapani added a comment - Neha Narkhede so the changes you are looking for are remove the config option and make the LinkedBlockingQueue to unbounded?
          Hide
          nehanarkhede Neha Narkhede added a comment -

          Increasing the queue size by a little doesn't really solve the problem. We should conduct more tests on an unbounded controller queue, if we have any doubt whether or not it will work. Sriharsha Chintalapani I will help you review changes to the controller, if you are up for updating your patch.

          Show
          nehanarkhede Neha Narkhede added a comment - Increasing the queue size by a little doesn't really solve the problem. We should conduct more tests on an unbounded controller queue, if we have any doubt whether or not it will work. Sriharsha Chintalapani I will help you review changes to the controller, if you are up for updating your patch.
          Hide
          junrao Jun Rao added a comment -

          Yes, in theory, we can make the queue unbounded. However, in practice, the queue shouldn't build up. I was a bit concerned that if we make the queue unbounded and another issue that causes the queue to build up, we may hit OOME. Then, we may not be able to take a thread dump to diagnose the issue.

          Show
          junrao Jun Rao added a comment - Yes, in theory, we can make the queue unbounded. However, in practice, the queue shouldn't build up. I was a bit concerned that if we make the queue unbounded and another issue that causes the queue to build up, we may hit OOME. Then, we may not be able to take a thread dump to diagnose the issue.
          Hide
          sriharsha Sriharsha Chintalapani added a comment -

          Neha Narkhede Jun Rao my understanding is that we created more room for KafkaController not to get into any of the above mentioned issues by setting to 10k but yes making unbounded is a better option as there could be a chance of exhausting 10k bounded queue and run into issues. We can get rid off controller.message.queue.size as config option and make the LinkedBlockingQueue unbounded.

          Show
          sriharsha Sriharsha Chintalapani added a comment - Neha Narkhede Jun Rao my understanding is that we created more room for KafkaController not to get into any of the above mentioned issues by setting to 10k but yes making unbounded is a better option as there could be a chance of exhausting 10k bounded queue and run into issues. We can get rid off controller.message.queue.size as config option and make the LinkedBlockingQueue unbounded.
          Hide
          nehanarkhede Neha Narkhede added a comment -

          Jun Rao, Sriharsha Chintalapani What's the value in changing it from something to 10K vs unbounded?

          Show
          nehanarkhede Neha Narkhede added a comment - Jun Rao , Sriharsha Chintalapani What's the value in changing it from something to 10K vs unbounded?
          Hide
          sriharsha Sriharsha Chintalapani added a comment -

          Created reviewboard https://reviews.apache.org/r/26560/diff/
          against branch origin/trunk

          Show
          sriharsha Sriharsha Chintalapani added a comment - Created reviewboard https://reviews.apache.org/r/26560/diff/ against branch origin/trunk
          Hide
          junrao Jun Rao added a comment -

          Sriharsha,

          Thanks for testing this out. Based on your result, it seems that auto leader balancing is stable with a large controller channel queue. Could you create a patch by changing the default value of controller.message.queue.size to 10000 and auto.leader.rebalance.enable to true? Once that's done, we can resolve this jira. We can file a separate jira for controller refactoring.

          Show
          junrao Jun Rao added a comment - Sriharsha, Thanks for testing this out. Based on your result, it seems that auto leader balancing is stable with a large controller channel queue. Could you create a patch by changing the default value of controller.message.queue.size to 10000 and auto.leader.rebalance.enable to true? Once that's done, we can resolve this jira. We can file a separate jira for controller refactoring.
          Hide
          sriharsha Sriharsha Chintalapani added a comment -

          Jun Rao ran tests with your suggested change. All of the controller shutdown went through without any issue.
          I'll file a separate JIRA for autoRebalanceScheduler.shutdown deadlock.

          Show
          sriharsha Sriharsha Chintalapani added a comment - Jun Rao ran tests with your suggested change. All of the controller shutdown went through without any issue. I'll file a separate JIRA for autoRebalanceScheduler.shutdown deadlock.
          Hide
          sriharsha Sriharsha Chintalapani added a comment -

          Thanks Jun Rao. Testing with above change.

          Show
          sriharsha Sriharsha Chintalapani added a comment - Thanks Jun Rao . Testing with above change.
          Hide
          junrao Jun Rao added a comment -

          Sriharsha,

          It seems that you found a deadlock.

          In KafkaController.onControllerResignation(), it holds the controller lock while calling autoRebalanceScheduler.shutdown(). The auto rebalance scheduler couldn't shutdown since it's blocked waiting for the controller lock in checkAndTriggerPartitionRebalance().

          To break the deadlock, we can move autoRebalanceScheduler.shutdown() in onControllerResignation() to before acquiring the controller lock. We don't need to hold on the controller lock while shutting down the scheduler.

          Show
          junrao Jun Rao added a comment - Sriharsha, It seems that you found a deadlock. In KafkaController.onControllerResignation(), it holds the controller lock while calling autoRebalanceScheduler.shutdown(). The auto rebalance scheduler couldn't shutdown since it's blocked waiting for the controller lock in checkAndTriggerPartitionRebalance(). To break the deadlock, we can move autoRebalanceScheduler.shutdown() in onControllerResignation() to before acquiring the controller lock. We don't need to hold on the controller lock while shutting down the scheduler.
          Hide
          sriharsha Sriharsha Chintalapani added a comment -

          Jun Rao I ran the above test you suggested with leader.imbalance.check.interval.seconds set to 30 and controller.message.queue.size set 10000. With 5 brokers and 1500 topics with 3 partitons and 3 replication factor. I am able to run into a case where a broker prints "Shutdown completed" but the process still hangs. Running the test by setting controller.message.queue.size to higher number.
          Here is thread dump

          Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode):

          "Attach Listener" daemon prio=10 tid=0x00007f8860003000 nid=0x26cc waiting on condition [0x0000000000000000]
          java.lang.Thread.State: RUNNABLE

          "Controller-4-to-broker-3-send-thread" prio=10 tid=0x00007f884c049000 nid=0x26b2 waiting on condition [0x00007f83c99e0000]
          java.lang.Thread.State: WAITING (parking)
          at sun.misc.Unsafe.park(Native Method)

          • parking to wait for <0x00000000d2be8008> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
            at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
            at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:121)
            at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

          "Thread-2" prio=10 tid=0x00007f8868005800 nid=0x26b1 waiting on condition [0x00007f83c98df000]
          java.lang.Thread.State: TIMED_WAITING (parking)
          at sun.misc.Unsafe.park(Native Method)

          • parking to wait for <0x00000000d2a34508> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
            at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
            at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468)
            at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88)
            at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:353)
            at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:348)
            at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:348)
            at kafka.utils.Utils$.inLock(Utils.scala:535)
            at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:348)
            at kafka.controller.KafkaController.shutdown(KafkaController.scala:663)
            at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:287)
            at kafka.utils.Utils$.swallow(Utils.scala:172)
            at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
            at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
            at kafka.utils.Logging$class.swallow(Logging.scala:94)
            at kafka.utils.Utils$.swallow(Utils.scala:45)
            at kafka.server.KafkaServer.shutdown(KafkaServer.scala:287)
            at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:40)
            at kafka.Kafka$$anon$1.run(Kafka.scala:42)

          "SIGTERM handler" daemon prio=10 tid=0x00007f8860002000 nid=0x26ae in Object.wait() [0x00007f83c9be2000]
          java.lang.Thread.State: WAITING (on object monitor)
          at java.lang.Object.wait(Native Method)

          • waiting on <0x00000000d018bda8> (a kafka.Kafka$$anon$1)
            at java.lang.Thread.join(Thread.java:1281)
          • locked <0x00000000d018bda8> (a kafka.Kafka$$anon$1)
            at java.lang.Thread.join(Thread.java:1355)
            at java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
            at java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
            at java.lang.Shutdown.runHooks(Shutdown.java:123)
            at java.lang.Shutdown.sequence(Shutdown.java:167)
            at java.lang.Shutdown.exit(Shutdown.java:212)
          • locked <0x00000000d0080660> (a java.lang.Class for java.lang.Shutdown)
            at java.lang.Terminator$1.handle(Terminator.java:52)
            at sun.misc.Signal$1.run(Signal.java:212)
            at java.lang.Thread.run(Thread.java:745)

          "kafka-scheduler-0" daemon prio=10 tid=0x00007f884c045000 nid=0x26aa waiting on condition [0x00007f83c9ee5000]
          java.lang.Thread.State: WAITING (parking)
          at sun.misc.Unsafe.park(Native Method)

          • parking to wait for <0x00000000cfb1b800> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
            at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
            at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
            at kafka.utils.Utils$.inLock(Utils.scala:533)
            at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1149)
            at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1147)
            at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
            at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
            at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
            at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
            at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
            at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1147)
            at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1126)
            at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
            at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
            at kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1126) at kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:326)
            at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99)
            at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            at java.lang.Thread.run(Thread.java:745)

          "Controller-4-to-broker-1-send-thread" prio=10 tid=0x00007f884c019800 nid=0x26a9 waiting on condition [0x00007f83c9fe6000]
          java.lang.Thread.State: WAITING (parking)
          at sun.misc.Unsafe.park(Native Method)

          • parking to wait for <0x00000000d2828b58> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
            at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
          Show
          sriharsha Sriharsha Chintalapani added a comment - Jun Rao I ran the above test you suggested with leader.imbalance.check.interval.seconds set to 30 and controller.message.queue.size set 10000. With 5 brokers and 1500 topics with 3 partitons and 3 replication factor. I am able to run into a case where a broker prints "Shutdown completed" but the process still hangs. Running the test by setting controller.message.queue.size to higher number. Here is thread dump Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode): "Attach Listener" daemon prio=10 tid=0x00007f8860003000 nid=0x26cc waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Controller-4-to-broker-3-send-thread" prio=10 tid=0x00007f884c049000 nid=0x26b2 waiting on condition [0x00007f83c99e0000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) parking to wait for <0x00000000d2be8008> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:121) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) "Thread-2" prio=10 tid=0x00007f8868005800 nid=0x26b1 waiting on condition [0x00007f83c98df000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) parking to wait for <0x00000000d2a34508> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468) at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:353) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:348) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:348) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:348) at kafka.controller.KafkaController.shutdown(KafkaController.scala:663) at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:287) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:45) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:45) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:287) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:40) at kafka.Kafka$$anon$1.run(Kafka.scala:42) "SIGTERM handler" daemon prio=10 tid=0x00007f8860002000 nid=0x26ae in Object.wait() [0x00007f83c9be2000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) waiting on <0x00000000d018bda8> (a kafka.Kafka$$anon$1) at java.lang.Thread.join(Thread.java:1281) locked <0x00000000d018bda8> (a kafka.Kafka$$anon$1) at java.lang.Thread.join(Thread.java:1355) at java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106) at java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46) at java.lang.Shutdown.runHooks(Shutdown.java:123) at java.lang.Shutdown.sequence(Shutdown.java:167) at java.lang.Shutdown.exit(Shutdown.java:212) locked <0x00000000d0080660> (a java.lang.Class for java.lang.Shutdown) at java.lang.Terminator$1.handle(Terminator.java:52) at sun.misc.Signal$1.run(Signal.java:212) at java.lang.Thread.run(Thread.java:745) "kafka-scheduler-0" daemon prio=10 tid=0x00007f884c045000 nid=0x26aa waiting on condition [0x00007f83c9ee5000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) parking to wait for <0x00000000cfb1b800> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290) at kafka.utils.Utils$.inLock(Utils.scala:533) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1149) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1147) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1147) at kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1126) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1126) at kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:326) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99) at kafka.utils.Utils$$anon$1.run(Utils.scala:54) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) "Controller-4-to-broker-1-send-thread" prio=10 tid=0x00007f884c019800 nid=0x26a9 waiting on condition [0x00007f83c9fe6000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) parking to wait for <0x00000000d2828b58> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
          Hide
          sriharsha Sriharsha Chintalapani added a comment -

          Jun Rao Ran into this issue when I am testing delete topics along while simultaneously running preferred replica leader election tool. I am running the above test you suggested will update with the results.

          Show
          sriharsha Sriharsha Chintalapani added a comment - Jun Rao Ran into this issue when I am testing delete topics along while simultaneously running preferred replica leader election tool. I am running the above test you suggested will update with the results.
          Hide
          jkreps Jay Kreps added a comment -

          Well I guess the point is that they shouldn't be mutually exclusive. So hopefully we can make them both be enabled by default.

          Show
          jkreps Jay Kreps added a comment - Well I guess the point is that they shouldn't be mutually exclusive. So hopefully we can make them both be enabled by default.
          Hide
          noslowerdna Andrew Olson added a comment -

          > from my perspective being able to enable auto leader balancing would be a huge win

          We are running with auto leader balance enabled and controlled shutdown disabled. Given that they're currently mutually exclusive options, is controlled shutdown generally considered more valuable than auto leader balancing? If so, why is that?

          Show
          noslowerdna Andrew Olson added a comment - > from my perspective being able to enable auto leader balancing would be a huge win We are running with auto leader balance enabled and controlled shutdown disabled. Given that they're currently mutually exclusive options, is controlled shutdown generally considered more valuable than auto leader balancing? If so, why is that?
          Hide
          jkreps Jay Kreps added a comment -

          FWIW, from my perspective being able to enable auto leader balancing would be a huge win. This is arguably the biggest operational "gotcha" today...

          Show
          jkreps Jay Kreps added a comment - FWIW, from my perspective being able to enable auto leader balancing would be a huge win. This is arguably the biggest operational "gotcha" today...
          Hide
          junrao Jun Rao added a comment -

          Thinking about this a bit more. As a short term fix, I think Jiangjie's suggestion actually has a point. Basically, if a channel queue is full (since the target broker is down), a thread that tries to put a request into the queue will block while holding the controller lock. This essentially will stall the controller since it can't process any other events, which is bad. Now, imagine what if we make the channel queue unbounded. In this case, no thread will block on putting requests into the queue. So, if a broker is down, the controller will always be able to act on it, which will clear up the queue and remove the channel to the dead broker. The only down side is that if there are outstanding requests in a queue, new important requests may be delayed. This is not a big concern because in the common case, the channel queue shouldn't build up.

          Sriharsha,

          Could you do a bit of testing on this? Basically, set the default value of controller.message.queue.size to sth large (e.g., 10K). Create a cluster with a few K partitions per broker. Enable auto leader balancing and keep doing rolling bounces of the cluster (with controlled shutdown enabled) and see there is any issue. Ideally, we want to hit the case that the auto leader balancing happens concurrently with the controlled shutdown in the controller. So, you may want to play with leader.imbalance.check.interval.seconds.

          Show
          junrao Jun Rao added a comment - Thinking about this a bit more. As a short term fix, I think Jiangjie's suggestion actually has a point. Basically, if a channel queue is full (since the target broker is down), a thread that tries to put a request into the queue will block while holding the controller lock. This essentially will stall the controller since it can't process any other events, which is bad. Now, imagine what if we make the channel queue unbounded. In this case, no thread will block on putting requests into the queue. So, if a broker is down, the controller will always be able to act on it, which will clear up the queue and remove the channel to the dead broker. The only down side is that if there are outstanding requests in a queue, new important requests may be delayed. This is not a big concern because in the common case, the channel queue shouldn't build up. Sriharsha, Could you do a bit of testing on this? Basically, set the default value of controller.message.queue.size to sth large (e.g., 10K). Create a cluster with a few K partitions per broker. Enable auto leader balancing and keep doing rolling bounces of the cluster (with controlled shutdown enabled) and see there is any issue. Ideally, we want to hit the case that the auto leader balancing happens concurrently with the controlled shutdown in the controller. So, you may want to play with leader.imbalance.check.interval.seconds.
          Hide
          nehanarkhede Neha Narkhede added a comment -

          Sriharsha Chintalapani Depends Basically, my perspective is that if doing this correctly requires delaying 0.8.2 by a month, then let's push it to 0.8.3. If there is a small fix for the issue, then let's include it. IIRC, Jun Rao was going to take a stab at thinking if there is a small fix or not.

          Show
          nehanarkhede Neha Narkhede added a comment - Sriharsha Chintalapani Depends Basically, my perspective is that if doing this correctly requires delaying 0.8.2 by a month, then let's push it to 0.8.3. If there is a small fix for the issue, then let's include it. IIRC, Jun Rao was going to take a stab at thinking if there is a small fix or not.
          Hide
          sriharsha Sriharsha Chintalapani added a comment -

          Neha Narkhede Is this still planned for 0.8.2 release.

          Show
          sriharsha Sriharsha Chintalapani added a comment - Neha Narkhede Is this still planned for 0.8.2 release.
          Hide
          nehanarkhede Neha Narkhede added a comment -

          This is a good but potentially very large change. Basically controller functionality would need to be modeled as multi-step actions that require need arbitrary delay between each of the steps that constitute an action. As such the rest of the logic definitely needs to go back into the queue.

          Show
          nehanarkhede Neha Narkhede added a comment - This is a good but potentially very large change. Basically controller functionality would need to be modeled as multi-step actions that require need arbitrary delay between each of the steps that constitute an action. As such the rest of the logic definitely needs to go back into the queue.
          Hide
          guozhang Guozhang Wang added a comment -

          Yeah. Reassigning partitions would take time and hence better be handled in a purgatory, but then there are a couple more subtle issues we need to be careful with:

          1. Upon condition (new replica caught up, etc) satisfied, shall we execute the rest of the logic in the satisfaction checking thread, which will then be a different thread, or just put the rest of the job back into the queue?

          2. Related to 1), as a topic is undergoing partition reassignment or any other "delayable" operations, we need to disable other operations under these topics, right?

          I agree that this is a rather big change and maybe we should push after 0.8.2.

          Show
          guozhang Guozhang Wang added a comment - Yeah. Reassigning partitions would take time and hence better be handled in a purgatory, but then there are a couple more subtle issues we need to be careful with: 1. Upon condition (new replica caught up, etc) satisfied, shall we execute the rest of the logic in the satisfaction checking thread, which will then be a different thread, or just put the rest of the job back into the queue? 2. Related to 1), as a topic is undergoing partition reassignment or any other "delayable" operations, we need to disable other operations under these topics, right? I agree that this is a rather big change and maybe we should push after 0.8.2.
          Hide
          junrao Jun Rao added a comment -

          Guozhang,

          Yes, I agree that the controller code is getting complicated and it's probably worth a refactoring. A single threaded controller perhaps will make the logic simpler and easier to understand. There are operations like reassigning partitions that are time consuming though. Instead of blocking on those operations, do we want to use a purgatory to track them?

          Since it may take time to completely fix this issue, I suggest that we just disable this feature in 0.8.2. Otherwise, people may turn it on and hit this issue.

          Show
          junrao Jun Rao added a comment - Guozhang, Yes, I agree that the controller code is getting complicated and it's probably worth a refactoring. A single threaded controller perhaps will make the logic simpler and easier to understand. There are operations like reassigning partitions that are time consuming though. Instead of blocking on those operations, do we want to use a purgatory to track them? Since it may take time to completely fix this issue, I suggest that we just disable this feature in 0.8.2. Otherwise, people may turn it on and hit this issue.
          Hide
          guozhang Guozhang Wang added a comment -

          Thanks Jiangjie Qin for the great findings. It seems to me that as long as the controller's channel manager is async, no matter how large is its queue the corner-case issue can still happen in (i.e. request blocked in the queue for brokers that is already shutdown but the ZK watcher not fired yet), and causing some chain of lock conflicts.

          Currently the controller has multiple threads for admin commands, ZK listeners, scheduled operations (leader electioner), etc, which complicates the locking mechanism inside controller. After going through the code I think it would be better to refactor the controller as following:

          1. Besides the async channel manager's sender thread, we use only a single controller thread and have a single working queue for the controller thread.
          3. ZK fire handling logic determines the event (topic/partition/broker change, admin operation, etc), and put the task into the queue.
          4. Scheduled task is also created periodically and put into the queue.
          5. The controller did one task at a time, which do not need to compete locks on controller metadata.
          6. Make the channel manager's queue size infinite and add a metric on monitoring its size.

          With this the controller logic would be easier to read / debug, may also help KAFKA-1558. The downside is that since a single thread is used, it loses parallelism for controller task handling, and the unbounded channel queue may also be an issue (when there is a bug). But since controller tasks are usually rare in practice, this should not be an issue.

          Show
          guozhang Guozhang Wang added a comment - Thanks Jiangjie Qin for the great findings. It seems to me that as long as the controller's channel manager is async, no matter how large is its queue the corner-case issue can still happen in (i.e. request blocked in the queue for brokers that is already shutdown but the ZK watcher not fired yet), and causing some chain of lock conflicts. Currently the controller has multiple threads for admin commands, ZK listeners, scheduled operations (leader electioner), etc, which complicates the locking mechanism inside controller. After going through the code I think it would be better to refactor the controller as following: 1. Besides the async channel manager's sender thread, we use only a single controller thread and have a single working queue for the controller thread. 3. ZK fire handling logic determines the event (topic/partition/broker change, admin operation, etc), and put the task into the queue. 4. Scheduled task is also created periodically and put into the queue. 5. The controller did one task at a time, which do not need to compete locks on controller metadata. 6. Make the channel manager's queue size infinite and add a metric on monitoring its size. With this the controller logic would be easier to read / debug, may also help KAFKA-1558 . The downside is that since a single thread is used, it loses parallelism for controller task handling, and the unbounded channel queue may also be an issue (when there is a bug). But since controller tasks are usually rare in practice, this should not be an issue.
          Hide
          becket_qin Jiangjie Qin added a comment - - edited

          I looked into this problem and it seems to me the issue is mainly because the default controller queue size was too small.
          The problem flow is as below:
          1. Controller 265 received controlled shutdown request
          2. Controller 265 put leaderAndIsrRequest into controller message queue and responded to broker 265.
          3. Broker 265 received respond from Controller 265, shutdown successfully and de-registerred itself form zk.
          4. Controller 265 request send thread started to send leaderAndIsrRequests which are put in step 2 to the brokers. Since broker 265 has already shutdown, it will start infinite retry. At this moment, the controller message queue size will never decrease. (Thread t@113)
          5. Scheduled preferred leader election started, grabbed the controller lock and was trying to put the LeaderAndIsr request into controller message queue. However, because the queue size is only 10, it could not finish but just blocking on the put method while still holding the controller lock. (Thread t@117)
          6. Broker change listener on controller 265 was triggered because broker path change in step 3, it was trying to grab the controller lock and stop thread t@113, but failed to do that because thread t@117 was holding controller lock and waiting on the controller message queue.

          Currently the controller message queue size is 10. IMO if we can increase the number to be 100 or even bigger, this problem won't happen again. Actually, in most time, the number of messages in the queue will be small even empty because there should not be too many controller messages. So increasing the queue size won't cause memory consumption to increase.

          Show
          becket_qin Jiangjie Qin added a comment - - edited I looked into this problem and it seems to me the issue is mainly because the default controller queue size was too small. The problem flow is as below: 1. Controller 265 received controlled shutdown request 2. Controller 265 put leaderAndIsrRequest into controller message queue and responded to broker 265. 3. Broker 265 received respond from Controller 265, shutdown successfully and de-registerred itself form zk. 4. Controller 265 request send thread started to send leaderAndIsrRequests which are put in step 2 to the brokers. Since broker 265 has already shutdown, it will start infinite retry. At this moment, the controller message queue size will never decrease. (Thread t@113) 5. Scheduled preferred leader election started, grabbed the controller lock and was trying to put the LeaderAndIsr request into controller message queue. However, because the queue size is only 10, it could not finish but just blocking on the put method while still holding the controller lock. (Thread t@117) 6. Broker change listener on controller 265 was triggered because broker path change in step 3, it was trying to grab the controller lock and stop thread t@113, but failed to do that because thread t@117 was holding controller lock and waiting on the controller message queue. Currently the controller message queue size is 10. IMO if we can increase the number to be 100 or even bigger, this problem won't happen again. Actually, in most time, the number of messages in the queue will be small even empty because there should not be too many controller messages. So increasing the queue size won't cause memory consumption to increase.
          Hide
          guozhang Guozhang Wang added a comment -

          Moving out of 0.8.2 for now.

          Show
          guozhang Guozhang Wang added a comment - Moving out of 0.8.2 for now.
          Hide
          junrao Jun Rao added a comment -

          Right, until this is fixed, don't turn set auto.leader.rebalance.enable and controlled.shutdown.enable to be true.

          Show
          junrao Jun Rao added a comment - Right, until this is fixed, don't turn set auto.leader.rebalance.enable and controlled.shutdown.enable to be true.
          Hide
          noslowerdna Andrew Olson added a comment -

          Is it recommended to not set both auto.leader.rebalance.enable=true and controlled.shutdown.enable=true?

          If this issue is encountered, killing the hung broker process sounds like the only resolution. Would that cause any issues for the other brokers in the cluster? Also, would it be problematic if the hung broker is not killed in a timely manner?

          Show
          noslowerdna Andrew Olson added a comment - Is it recommended to not set both auto.leader.rebalance.enable=true and controlled.shutdown.enable=true? If this issue is encountered, killing the hung broker process sounds like the only resolution. Would that cause any issues for the other brokers in the cluster? Also, would it be problematic if the hung broker is not killed in a timely manner?
          Hide
          guozhang Guozhang Wang added a comment -

          This is a similar issue as for KAFKA-1235. One alternative solution to kill both birds is to allow the sender thread jump out of the infinite retry if it realize that the destination broker is shutting down.

          Show
          guozhang Guozhang Wang added a comment - This is a similar issue as for KAFKA-1235 . One alternative solution to kill both birds is to allow the sender thread jump out of the infinite retry if it realize that the destination broker is shutting down.

            People

            • Assignee:
              sriharsha Sriharsha Chintalapani
              Reporter:
              jjkoshy Joel Koshy
              Reviewer:
              Jun Rao
            • Votes:
              1 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development