diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index beca460..900b429 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -90,6 +90,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext private def removeExistingBroker(brokerId: Int) { try { brokerStateInfo(brokerId).channel.disconnect() + brokerStateInfo(brokerId).messageQueue.clear() brokerStateInfo(brokerId).requestSendThread.shutdown() brokerStateInfo.remove(brokerId) }catch { diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 3beaf75..a0ed8e4 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -888,6 +888,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg */ @throws(classOf[Exception]) def handleNewSession() { + info("ZK expired; shut down all controller components and try to re-elect") controllerContext.controllerLock synchronized { Utils.unregisterMBean(KafkaController.MBeanName) partitionStateMachine.shutdown() diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 829163a..5859ce7 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -401,7 +401,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val partitionsRemainingToBeAdded = partitionReplicaAssignment.filter(p => !controllerContext.partitionReplicaAssignment.contains(p._1)) info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded)) - controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet) + if (partitionsRemainingToBeAdded.size > 0) + controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet) } catch { case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e ) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 29abc46..c9f92a2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -519,11 +519,8 @@ class KafkaApis(val requestChannel: RequestChannel, uniqueTopics = { if(metadataRequest.topics.size > 0) metadataRequest.topics.toSet - else { - partitionMetadataLock synchronized { - leaderCache.keySet.map(_.topic) - } - } + else + leaderCache.keySet.map(_.topic) } val topicMetadataList = partitionMetadataLock synchronized {