diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 9b0f7e9..1569ccf 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -18,28 +18,17 @@ package kafka.network import java.util.concurrent._ +import atomic.AtomicBoolean import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.nio.ByteBuffer import kafka.api._ -import kafka.common.TopicAndPartition import kafka.utils.{Logging, SystemTime} -import kafka.message.ByteBufferMessageSet import java.net._ +import locks.ReentrantLock object RequestChannel extends Logging { - val AllDone = new Request(1, 2, getShutdownReceive(), 0) - - def getShutdownReceive() = { - val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, ByteBufferMessageSet]()) - val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2) - byteBuffer.putShort(RequestKeys.ProduceKey) - emptyProducerRequest.writeTo(byteBuffer) - byteBuffer.rewind() - byteBuffer - } - case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) { @volatile var dequeueTimeMs = -1L @volatile var apiLocalCompleteTimeMs = -1L @@ -95,6 +84,9 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors) for(i <- 0 until numProcessors) responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]() + private val requestQueueLock = new ReentrantLock + private val requestAvailable = requestQueueLock.newCondition + private val isShuttingDown = new AtomicBoolean(false) newGauge( "RequestQueueSize", @@ -105,7 +97,14 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ def sendRequest(request: RequestChannel.Request) { - requestQueue.put(request) + requestQueueLock.lock + try { + requestQueue.put(request) + if(requestQueue.size == 1) + requestAvailable.signalAll + } finally { + requestQueueLock.unlock + } } /** Send a response back to the socket server to be sent over the network */ @@ -116,8 +115,19 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe } /** Get the next request or block until there is one */ - def receiveRequest(): RequestChannel.Request = - requestQueue.take() + def receiveRequest(): RequestChannel.Request = { + requestQueueLock.lock + var request: RequestChannel.Request = null + try { + while(requestQueue.size == 0 && !isShuttingDown.get) + requestAvailable.await + if(requestQueue.size > 0) + request = requestQueue.take + request + } finally { + requestQueueLock.unlock + } + } /** Get a response for the given processor if there is one */ def receiveResponse(processor: Int): RequestChannel.Response = @@ -126,6 +136,16 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe def addResponseListener(onResponse: Int => Unit) { responseListeners ::= onResponse } + + def close() { + requestQueueLock.lock + try { + isShuttingDown.set(true) + requestAvailable.signalAll + } finally { + requestQueueLock.unlock + } + } } object RequestMetrics { diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b056e25..53f8d87 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -71,6 +71,7 @@ class SocketServer(val brokerId: Int, acceptor.shutdown() for(processor <- processors) processor.shutdown() + requestChannel.close info("shut down completely") } } diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index f0c05a5..0149644 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -20,32 +20,37 @@ package kafka.server import kafka.network._ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup -import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{CountDownLatch, TimeUnit} /** * A thread that answers kafka requests. */ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging { this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], " + val isRunning = new AtomicBoolean(true) + val shutdownComplete = new CountDownLatch(1) def run() { - while(true) { + while(isRunning.get) { try { val req = requestChannel.receiveRequest() - if(req eq RequestChannel.AllDone){ - trace("receives shut down command, shut down".format(brokerId, id)) - return + if(req != null) { + req.dequeueTimeMs = SystemTime.milliseconds + debug("handles request " + req) + apis.handle(req) } - req.dequeueTimeMs = SystemTime.milliseconds - debug("handles request " + req) - apis.handle(req) } catch { case e: Throwable => error("exception when handling request", e) } } + shutdownComplete.countDown } - def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone) + def shutdown(): Unit = { + isRunning.set(false) + shutdownComplete.await + } } class KafkaRequestHandlerPool(val brokerId: Int, diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 1fe1ca9..09e261f 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -110,6 +110,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg info("shutting down") val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { + if(socketServer != null) + Utils.swallow(socketServer.shutdown()) if(requestHandlerPool != null) Utils.swallow(requestHandlerPool.shutdown()) Utils.swallow(kafkaScheduler.shutdown()) @@ -119,8 +121,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(kafkaZookeeper.shutdown()) if(replicaManager != null) Utils.swallow(replicaManager.shutdown()) - if(socketServer != null) - Utils.swallow(socketServer.shutdown()) if(logManager != null) Utils.swallow(logManager.shutdown())