diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 9b0f7e9..7747ddd 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -126,6 +126,10 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe def addResponseListener(onResponse: Int => Unit) { responseListeners ::= onResponse } + + def shutdown() { + requestQueue.clear + } } object RequestMetrics { diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b056e25..8f0053a 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -71,6 +71,7 @@ class SocketServer(val brokerId: Int, acceptor.shutdown() for(processor <- processors) processor.shutdown() + requestChannel.shutdown info("shut down completely") } } diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index f0c05a5..69ca058 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -32,7 +32,7 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha while(true) { try { val req = requestChannel.receiveRequest() - if(req eq RequestChannel.AllDone){ + if(req eq RequestChannel.AllDone) { trace("receives shut down command, shut down".format(brokerId, id)) return } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 1fe1ca9..09e261f 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -110,6 +110,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg info("shutting down") val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { + if(socketServer != null) + Utils.swallow(socketServer.shutdown()) if(requestHandlerPool != null) Utils.swallow(requestHandlerPool.shutdown()) Utils.swallow(kafkaScheduler.shutdown()) @@ -119,8 +121,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(kafkaZookeeper.shutdown()) if(replicaManager != null) Utils.swallow(replicaManager.shutdown()) - if(socketServer != null) - Utils.swallow(socketServer.shutdown()) if(logManager != null) Utils.swallow(logManager.shutdown())