From d5e16394784a325188ea19273d22a9b3d9eba3a1 Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Sat, 31 Jan 2015 19:12:48 +0530 Subject: [PATCH] KAFKA-1907 Introduce a custom ZkClient for Kafka which allows timeouts on operations. Also mark certain Kafka threads as daemon to allow proper JVM shutdown --- .../main/scala/kafka/network/SocketServer.scala | 4 +- .../main/scala/kafka/server/DelayedOperation.scala | 2 + .../main/scala/kafka/server/KafkaHealthcheck.scala | 5 +- core/src/main/scala/kafka/server/KafkaServer.scala | 12 +- .../src/main/scala/kafka/utils/KafkaZkClient.scala | 162 +++++++++++++++++++++ core/src/main/scala/kafka/utils/Utils.scala | 4 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 41 ++++-- 7 files changed, 205 insertions(+), 25 deletions(-) create mode 100644 core/src/main/scala/kafka/utils/KafkaZkClient.scala diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 39b1651..ce3b4c5 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -72,7 +72,7 @@ class SocketServer(val brokerId: Int, requestChannel, quotas, connectionsMaxIdleMs) - Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() + Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), true).start() } newGauge("ResponsesBeingSent", new Gauge[Int] { @@ -84,7 +84,7 @@ class SocketServer(val brokerId: Int, // start accepting connections this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas) - Utils.newThread("kafka-socket-acceptor", acceptor, false).start() + Utils.newThread("kafka-socket-acceptor", acceptor, true).start() acceptor.awaitStartup info("Started") } diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index fc06b01..a9c3aec 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -249,6 +249,8 @@ class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeI "ExpirationReaper-%d".format(brokerId), false) { + this.setDaemon(true) + /* The queue storing all delayed operations */ private val delayedQueue = new DelayQueue[T] diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 4acdd70..8f6db27 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -35,7 +35,7 @@ class KafkaHealthcheck(private val brokerId: Int, private val advertisedHost: String, private val advertisedPort: Int, private val zkSessionTimeoutMs: Int, - private val zkClient: ZkClient) extends Logging { + private val zkClient: KafkaZkClient) extends Logging { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId val sessionExpireListener = new SessionExpireListener @@ -47,7 +47,8 @@ class KafkaHealthcheck(private val brokerId: Int, def shutdown() { zkClient.unsubscribeStateChanges(sessionExpireListener) - ZkUtils.deregisterBrokerInZk(zkClient, brokerId) + // TODO: The timeout that's hardcoded in here, should be picked up from the configured connection timeout + ZkUtils.deregisterBrokerInZk(zkClient, brokerId, 5000) } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 89200da..9f2299b 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -57,7 +57,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var apis: KafkaApis = null var kafkaController: KafkaController = null val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) - var zkClient: ZkClient = null + var zkClient: KafkaZkClient = null val brokerMetaPropsFile = "meta.properties" val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap @@ -143,7 +143,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg } } - private def initZk(): ZkClient = { + private def initZk(): KafkaZkClient = { info("Connecting to zookeeper on " + config.zkConnect) val chroot = { @@ -155,13 +155,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if (chroot.length > 1) { val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/")) - val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClientForChrootCreation = new KafkaZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot) info("Created zookeeper path " + chroot) zkClientForChrootCreation.close() } - val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClient = new KafkaZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) ZkUtils.setupCommonPaths(zkClient) zkClient } @@ -198,7 +198,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg // Get the current controller info. This is to ensure we use the most recent info to issue the // controlled shutdown request - val controllerId = ZkUtils.getController(zkClient) + val controllerId = ZkUtils.getController(zkClient, config.zkConnectionTimeoutMs) ZkUtils.getBrokerInfo(zkClient, controllerId) match { case Some(broker) => if (channel == null || prevController == null || !prevController.equals(broker)) { @@ -375,7 +375,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg *
  • config has broker.id and meta.properties contains broker.id if they don't match throws InconsistentBrokerIdException *
  • config has broker.id and there is no meta.properties file, creates new meta.properties and stores broker.id *
      - * @returns A brokerId. + * @return A brokerId. */ private def getBrokerId: Int = { var brokerId = config.brokerId diff --git a/core/src/main/scala/kafka/utils/KafkaZkClient.scala b/core/src/main/scala/kafka/utils/KafkaZkClient.scala new file mode 100644 index 0000000..52e3ef1 --- /dev/null +++ b/core/src/main/scala/kafka/utils/KafkaZkClient.scala @@ -0,0 +1,162 @@ +package kafka.utils + +import java.util +import java.util.concurrent.{Callable, TimeUnit, TimeoutException} + +import org.I0Itec.zkclient.exception.ZkNoNodeException +import org.I0Itec.zkclient.serialize.ZkSerializer +import org.I0Itec.zkclient.{IZkChildListener, IZkConnection, IZkDataListener, ZkClient} +import org.apache.zookeeper.data.Stat + +import scala.collection.mutable + + +/** + * An enhanced version of the {@link ZkClient} which allows timeouts on potentially "forever blocking" operations done by the + * {@link ZkClient} + * + * @author Jaikiran Pai + */ +class KafkaZkClient(zkServers: String, + sessionTimeoutInMillis: Int, + connectionTimeoutInMillis: Int, + zkSerializer: ZkSerializer) + extends ZkClient(zkServers, sessionTimeoutInMillis, connectionTimeoutInMillis, zkSerializer) { + + private val dataListenerPaths = new mutable.HashSet[String] with mutable.SynchronizedSet[String] + private val childListenerPaths = new mutable.HashSet[String] with mutable.SynchronizedSet[String] + + // we override this to be aware of whether or not there are any listeners on a specific path + override def subscribeChildChanges(path: String, listener: IZkChildListener): util.List[String] = { + val result = super.subscribeChildChanges(path, listener) + childListenerPaths.add(path) + result + } + + // we override this to be aware of whether or not there are any listeners on a specific path + override def unsubscribeChildChanges(path: String, childListener: IZkChildListener): Unit = { + super.unsubscribeChildChanges(path, childListener) + childListenerPaths.remove(path) + } + + // we override this to be aware of whether or not there are any listeners on a specific path + override def subscribeDataChanges(path: String, listener: IZkDataListener): Unit = { + super.subscribeDataChanges(path, listener) + dataListenerPaths.add(path) + } + + // we override this to be aware of whether or not there are any listeners on a specific path + override def unsubscribeDataChanges(path: String, dataListener: IZkDataListener): Unit = { + super.unsubscribeDataChanges(path, dataListener) + dataListenerPaths.remove(path) + } + + // we override this to be aware of whether or not there are any listeners on a specific path + override def unsubscribeAll(): Unit = { + super.unsubscribeAll() + dataListenerPaths.clear() + childListenerPaths.clear() + } + + /** + * This is the same as calling {@link #waitUntilConnected(connectionTimeoutInMillis, TimeUnit.MILLISECONDS}, where + * connectionTimeoutInMillis is the timeout that's configured in {@link KafkaConfig} + */ + override def waitUntilConnected(): Unit = { + waitUntilConnected(connectionTimeoutInMillis, TimeUnit.MILLISECONDS) + } + + /** + * Tries to read the data for path within the given timeout. If the timeout is <= 0 then this method + * blocks until the data is available. If no data is available within the given timeout, then this method throws a {@link TimeoutException}, + * else it returns the available data + * + * @param path The path from where to read the data + * @param stat + * @param timeoutInMillis The timeout in milli seconds + * @tparam T + * @return The data if available within the specified timeout + */ + def readData[T >: Null](path: String, stat: Stat, timeoutInMillis: Long = -1): T = { + if (timeoutInMillis <= 0) { + return readData(path, stat) + } + val expiryTime = System.currentTimeMillis() + timeoutInMillis + val expirableReadTask = new ExpirableReadDataTask(_connection, path, stat, hasListeners(path), expiryTime) + val data: Array[Byte] = retryUntilConnected(expirableReadTask) + if (data == null) { + return null + } + return zkSerializer.deserialize(data).asInstanceOf[T] + } + + /** + * Tries to delete the path within the given timeout. If the timeout is <= 0 then this method + * blocks until the delete completes. If the delete isn't completed within the given timeout, then this method throws a {@link TimeoutException}. + * + * @param path The path to delete + * @param timeoutInMillis The timeout in milli seconds + * @return True if the delete completed successfully. False otherwise. + */ + def delete(path: String, timeoutInMillis: Long = -1): Boolean = { + if (timeoutInMillis <= 0) { + return delete(path) + } + val expiryTime = System.currentTimeMillis() + timeoutInMillis + try { + retryUntilConnected(new ExpirableDeletePathTask(_connection, path, expiryTime)) + return true + } catch { + case znne: ZkNoNodeException => { + return false + } + } + } + + private def hasListeners(path: String): Boolean = { + !dataListenerPaths.isEmpty || !childListenerPaths.isEmpty + } +} + +// A task which gets called repeatedly within the retryUntilConnected method of ZkClient. This task has +// the ability to throw a TimeoutException if it gets called after its specified expiry time, +// from the retryUntilConnected method of ZkClient +private abstract class ExpirableTask[T](expiryTime: Long) extends Callable[T] { + override def call(): T = { + if (System.currentTimeMillis() >= expiryTime) { + throw new TimeoutException(taskDescription + " timed out") + } + doCall() + } + + protected def doCall(): T + + protected def taskDescription(): String +} + +private class ExpirableReadDataTask(zkConnection: IZkConnection, + path: String, + stat: Stat, + watch: Boolean, + expiryTime: Long) extends ExpirableTask[Array[Byte]](expiryTime) { + + protected override def doCall(): Array[Byte] = { + zkConnection.readData(path, stat, watch) + } + + protected override def taskDescription(): String = { + "Read data at path " + path + } +} + +private class ExpirableDeletePathTask(zkConnection: IZkConnection, + path: String, + expiryTime: Long) extends ExpirableTask[Unit](expiryTime) { + protected override def doCall(): Unit = { + zkConnection.delete(path) + } + + protected override def taskDescription(): String = { + "Delete path " + path + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 738c1af..ab0db60 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -84,7 +84,7 @@ object Utils extends Logging { * Create a new thread * @param name The name of the thread * @param runnable The work for the thread to do - * @param daemon Should the thread block JVM shutdown? + * @param daemon If this is false then the thread will block JVM shutdown * @return The unstarted thread */ def newThread(name: String, runnable: Runnable, daemon: Boolean): Thread = { @@ -101,7 +101,7 @@ object Utils extends Logging { /** * Create a new thread * @param runnable The work for the thread to do - * @param daemon Should the thread block JVM shutdown? + * @param daemon If this is false then the thread will block JVM shutdown * @return The unstarted thread */ def newThread(runnable: Runnable, daemon: Boolean): Thread = { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index c14bd45..f1fdebf 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -62,8 +62,8 @@ object ZkUtils extends Logging { def getDeleteTopicPath(topic: String): String = DeleteTopicsPath + "/" + topic - def getController(zkClient: ZkClient): Int = { - readDataMaybeNull(zkClient, ControllerPath)._1 match { + def getController(zkClient: ZkClient, timeoutInMillis: Long = -1): Int = { + readDataMaybeNull(zkClient, ControllerPath, timeoutInMillis)._1 match { case Some(controller) => KafkaController.parseControllerId(controller) case None => throw new KafkaException("Controller doesn't exist") } @@ -189,9 +189,9 @@ object ZkUtils extends Logging { info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port)) } - def deregisterBrokerInZk(zkClient: ZkClient, id: Int) { + def deregisterBrokerInZk(zkClient: KafkaZkClient, id: Int, timeoutInMillis: Long = -1) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id - deletePath(zkClient, brokerIdPath) + deletePath(zkClient, brokerIdPath, timeoutInMillis) info("Deregistered broker %d at path %s.".format(id, brokerIdPath)) } @@ -421,9 +421,15 @@ object ZkUtils extends Logging { } } - def deletePath(client: ZkClient, path: String): Boolean = { + def deletePath(client: ZkClient, path: String, timeoutInMillis: Long = -1): Boolean = { try { - client.delete(path) + // if it's a KafkaZkClient then it has the ability to operate with timeouts + if (client.isInstanceOf[KafkaZkClient]) { + client.asInstanceOf[KafkaZkClient].delete(path, timeoutInMillis) + } else { + // fallback on the normal ZkClient which could potentially block forever, for example when Zookeeper is down + client.delete(path) + } } catch { case e: ZkNoNodeException => // this can happen during a connection loss event, return normally @@ -460,15 +466,24 @@ object ZkUtils extends Logging { (dataStr, stat) } - def readDataMaybeNull(client: ZkClient, path: String): (Option[String], Stat) = { + def readDataMaybeNull(client: ZkClient, + path: String, + timeoutInMillis: Long = -1): (Option[String], Stat) = { + val stat: Stat = new Stat() val dataAndStat = try { - (Some(client.readData(path, stat)), stat) - } catch { - case e: ZkNoNodeException => - (None, stat) - case e2: Throwable => throw e2 - } + // if it's a KafkaZkClient, then it has support for timeouts + if (client.isInstanceOf[KafkaZkClient]) { + (Some(client.asInstanceOf[KafkaZkClient].readData(path, stat, timeoutInMillis)), stat) + } else { + // fallback on the normal client which could potentially wait forever if zookeeper is down + (Some(client.readData(path, stat)), stat) + } + } catch { + case e: ZkNoNodeException => + (None, stat) + case e2: Throwable => throw e2 + } dataAndStat } -- 1.9.1