diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 6dae149..d7b2633 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -46,7 +46,9 @@ class SimpleConsumer(val host: String, } private def disconnect() = { - if(blockingChannel.isConnected) { + // The channel needs to be disconnected even if it is not connected but just created + // This is to avoid socket leak + if(blockingChannel.isCreated) { debug("Disconnecting from " + host + ":" + port) blockingChannel.disconnect() } @@ -66,9 +68,9 @@ class SimpleConsumer(val host: String, private def sendRequest(request: RequestOrResponse): Receive = { lock synchronized { - getOrMakeConnection() var response: Receive = null try { + getOrMakeConnection() blockingChannel.send(request) response = blockingChannel.receive() } catch { diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index ea8485b..2afbc9b 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -124,7 +124,8 @@ class RequestSendThread(val controllerId: Int, try{ lock synchronized { var isSendSuccessful = false - while(isRunning.get() && !isSendSuccessful) { + var giveupSend = false + while(isRunning.get() && !isSendSuccessful && !giveupSend) { // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying. try { @@ -138,6 +139,14 @@ class RequestSendThread(val controllerId: Int, channel.disconnect() connectToBroker(toBroker, channel) isSendSuccessful = false + // if the broker is shutting down, then it may be not reachable any more since its socket server + // may has shutdown, in this case stop retrying + if(controllerContext.shuttingDownBrokerIds.contains(toBroker.id)) { + warn(("Controller %d epoch %d give up to resend %s request with correlation id %s to " + + "broker %s since it is shuting down. ").format(controllerId, controllerContext.epoch, + RequestKeys.nameForKey(request.requestId.get), request.correlationId, toBroker.toString())) + giveupSend = true + } // backoff before retrying the connection and send Utils.swallow(Thread.sleep(300)) } diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index d22dabd..9d31e63 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -71,18 +71,24 @@ class BlockingChannel( val host: String, } def disconnect() = lock synchronized { - if(connected || channel != null) { - // closing the main socket channel *should* close the read channel - // but let's do it to be sure. + if(channel != null) { swallow(channel.close()) swallow(channel.socket.close()) + channel = null + writeChannel = null + } + // closing the main socket channel *should* close the read channel + // but let's do it to be sure. + if(readChannel != null) { swallow(readChannel.close()) - channel = null; readChannel = null; writeChannel = null - connected = false + readChannel = null } + connected = false } def isConnected = connected + + def isCreated = channel != null || readChannel != null def send(request: RequestOrResponse):Int = { if(!connected) diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 041cfa5..cacb5d4 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -126,7 +126,9 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { */ private def disconnect() { try { - if(blockingChannel.isConnected) { + // The channel needs to be disconnected even if it is not connected but just created + // This is to avoid socket leak + if(blockingChannel.isCreated) { info("Disconnecting from " + config.host + ":" + config.port) blockingChannel.disconnect() } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3c3aafc..04a5d39 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -33,7 +33,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro } private def getLogRetentionTimeMillis(): Long = { - var millisInMinute = 60L * 1000L + val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute if(props.containsKey("log.retention.minutes")){ millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5e34f95..6e48936 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -133,13 +133,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg // of time and try again for a configured number of retries. If all the attempt fails, we simply force // the shutdown. var remainingRetries = config.controlledShutdownMaxRetries + val alwaysRetry = config.controlledShutdownMaxRetries == -1 info("Starting controlled shutdown") - var channel : BlockingChannel = null; + var channel : BlockingChannel = null var prevController : Broker = null - var shutdownSuceeded : Boolean =false + var shutdownSuceeded : Boolean = false try { - while (!shutdownSuceeded && remainingRetries > 0) { - remainingRetries = remainingRetries - 1 + while (!shutdownSuceeded && (alwaysRetry || remainingRetries > 0)) { + if (!alwaysRetry) remainingRetries = remainingRetries - 1 // 1. Find the controller and establish a connection to it.