From 3085d194a8aa410417f8081c91ced0eb47f6cddd Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Tue, 19 May 2015 10:41:42 +0530 Subject: [PATCH] KAFKA-1907 Set operation retry timeout on ZkClient. Also mark certain Kafka threads as daemon to allow proper JVM shutdown --- clients/src/main/java/org/apache/kafka/common/utils/Utils.java | 2 +- core/src/main/scala/kafka/network/SocketServer.scala | 4 ++-- core/src/main/scala/kafka/server/DelayedOperation.scala | 2 ++ core/src/main/scala/kafka/server/KafkaServer.scala | 8 ++++++-- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index f73eedb..65a7c88 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -402,7 +402,7 @@ public class Utils { * Create a new thread * @param name The name of the thread * @param runnable The work for the thread to do - * @param daemon Should the thread block JVM shutdown? + * @param daemon If this is false then the thread will block JVM shutdown * @return The unstarted thread */ public static Thread newThread(String name, Runnable runnable, Boolean daemon) { diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index edf6214..53b5836 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -87,7 +87,7 @@ class SocketServer(val brokerId: Int, quotas, connectionsMaxIdleMs, portToProtocol) - Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start() + Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), true).start() } } @@ -106,7 +106,7 @@ class SocketServer(val brokerId: Int, endpoints.values.foreach(endpoint => { val acceptor = new Acceptor(endpoint.host, endpoint.port, processors, sendBufferSize, recvBufferSize, quotas, endpoint.protocolType, portToProtocol) acceptors.put(endpoint, acceptor) - Utils.newThread("kafka-socket-acceptor-%s-%d".format(endpoint.protocolType.toString, endpoint.port), acceptor, false).start() + Utils.newThread("kafka-socket-acceptor-%s-%d".format(endpoint.protocolType.toString, endpoint.port), acceptor, true).start() acceptor.awaitStartup }) } diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 2ed9b46..931eaba 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -307,6 +307,8 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, br "ExpirationReaper-%d".format(brokerId), false) { + this.setDaemon(true) + override def doWork() { timeoutTimer.advanceClock(200L) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index ea6d165..14f5c74 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -46,6 +46,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg private val isShuttingDown = new AtomicBoolean(false) private val isStartingUp = new AtomicBoolean(false) + // TODO: For now, the retry timeout on ZkClient is a internal implementation detail and isn't exposed as a configurable property. + // Rethink this at a later date. + private val zkOperationRetryTimeoutInMillis = 5000; + private var shutdownLatch = new CountDownLatch(1) val brokerState: BrokerState = new BrokerState @@ -196,13 +200,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if (chroot.length > 1) { val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/")) - val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer, zkOperationRetryTimeoutInMillis) ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot) info("Created zookeeper path " + chroot) zkClientForChrootCreation.close() } - val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer, zkOperationRetryTimeoutInMillis) ZkUtils.setupCommonPaths(zkClient) zkClient } -- 1.9.1