diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 6dae149..d7b2633 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -46,7 +46,9 @@ class SimpleConsumer(val host: String, } private def disconnect() = { - if(blockingChannel.isConnected) { + // The channel needs to be disconnected even if it is not connected but just created + // This is to avoid socket leak + if(blockingChannel.isCreated) { debug("Disconnecting from " + host + ":" + port) blockingChannel.disconnect() } @@ -66,9 +68,9 @@ class SimpleConsumer(val host: String, private def sendRequest(request: RequestOrResponse): Receive = { lock synchronized { - getOrMakeConnection() var response: Receive = null try { + getOrMakeConnection() blockingChannel.send(request) response = blockingChannel.receive() } catch { diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index ab04b3f..9d31e63 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -71,18 +71,24 @@ class BlockingChannel( val host: String, } def disconnect() = lock synchronized { - if(connected || channel != null) { - // closing the main socket channel *should* close the read channel - // but let's do it to be sure. + if(channel != null) { swallow(channel.close()) swallow(channel.socket.close()) - if(readChannel != null) swallow(readChannel.close()) - channel = null; readChannel = null; writeChannel = null - connected = false + channel = null + writeChannel = null } + // closing the main socket channel *should* close the read channel + // but let's do it to be sure. + if(readChannel != null) { + swallow(readChannel.close()) + readChannel = null + } + connected = false } def isConnected = connected + + def isCreated = channel != null || readChannel != null def send(request: RequestOrResponse):Int = { if(!connected) diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 041cfa5..cacb5d4 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -126,7 +126,9 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { */ private def disconnect() { try { - if(blockingChannel.isConnected) { + // The channel needs to be disconnected even if it is not connected but just created + // This is to avoid socket leak + if(blockingChannel.isCreated) { info("Disconnecting from " + config.host + ":" + config.port) blockingChannel.disconnect() } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3c3aafc..04a5d39 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -33,7 +33,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro } private def getLogRetentionTimeMillis(): Long = { - var millisInMinute = 60L * 1000L + val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute if(props.containsKey("log.retention.minutes")){ millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue)) diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 8c69d09..4acdd70 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -38,12 +38,18 @@ class KafkaHealthcheck(private val brokerId: Int, private val zkClient: ZkClient) extends Logging { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId + val sessionExpireListener = new SessionExpireListener def startup() { - zkClient.subscribeStateChanges(new SessionExpireListener) + zkClient.subscribeStateChanges(sessionExpireListener) register() } + def shutdown() { + zkClient.unsubscribeStateChanges(sessionExpireListener) + ZkUtils.deregisterBrokerInZk(zkClient, brokerId) + } + /** * Register this broker as "alive" in zookeeper */ diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5e34f95..b79cc43 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -133,13 +133,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg // of time and try again for a configured number of retries. If all the attempt fails, we simply force // the shutdown. var remainingRetries = config.controlledShutdownMaxRetries + val alwaysRetry = config.controlledShutdownMaxRetries == -1 info("Starting controlled shutdown") - var channel : BlockingChannel = null; + var channel : BlockingChannel = null var prevController : Broker = null - var shutdownSuceeded : Boolean =false + var shutdownSuceeded : Boolean = false try { - while (!shutdownSuceeded && remainingRetries > 0) { - remainingRetries = remainingRetries - 1 + while (!shutdownSuceeded && (alwaysRetry || remainingRetries > 0)) { + if (!alwaysRetry) remainingRetries = remainingRetries - 1 // 1. Find the controller and establish a connection to it. @@ -218,6 +219,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val canShutdown = isShuttingDown.compareAndSet(false, true); if (canShutdown) { Utils.swallow(controlledShutdown()) + if(kafkaHealthcheck != null) + Utils.swallow(kafkaHealthcheck.shutdown()) if(socketServer != null) Utils.swallow(socketServer.shutdown()) if(requestHandlerPool != null) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index fa86bb9..5229873 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -210,6 +210,12 @@ object ZkUtils extends Logging { info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port)) } + def deregisterBrokerInZk(zkClient: ZkClient, id: Int) { + val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id + deletePath(zkClient, brokerIdPath) + info("Deregistered broker %d at path %s.".format(id, brokerIdPath)) + } + def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = { val topicDirs = new ZKGroupTopicDirs(group, topic) topicDirs.consumerOwnerDir + "/" + partition diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 17a99f1..db0e58b 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -56,11 +56,11 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { producerProps.put("request.required.acks", "-1") override def tearDown() { - super.tearDown() for(server <- servers) { server.shutdown() Utils.rm(server.config.logDirs(0)) } + super.tearDown() } def testHWCheckpointNoFailuresSingleLogSegment { diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 4e25b92..67d9c4b 100644 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -29,15 +29,15 @@ trait ZooKeeperTestHarness extends JUnit3Suite { val zkSessionTimeout = 6000 override def setUp() { + super.setUp zookeeper = new EmbeddedZookeeper(zkConnect) zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) - super.setUp } override def tearDown() { - super.tearDown Utils.swallow(zkClient.close()) Utils.swallow(zookeeper.shutdown()) + super.tearDown } }