From e5eb373dcec7562292cec32f3962e42dda5cea24 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Thu, 7 May 2015 13:15:55 -0700 Subject: [PATCH] KAFKA-2169: Moving to zkClient 0.5 release. --- build.gradle | 2 +- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 4 ++++ core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala | 5 +++++ core/src/main/scala/kafka/controller/KafkaController.scala | 5 +++++ core/src/main/scala/kafka/server/KafkaHealthcheck.scala | 5 +++++ 5 files changed, 20 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index fef515b..cd2aa83 100644 --- a/build.gradle +++ b/build.gradle @@ -207,7 +207,7 @@ project(':core') { compile project(':clients') compile "org.scala-lang:scala-library:$scalaVersion" compile 'org.apache.zookeeper:zookeeper:3.4.6' - compile 'com.101tec:zkclient:0.3' + compile 'com.101tec:zkclient:0.5' compile 'com.yammer.metrics:metrics-core:2.2.0' compile 'net.sf.jopt-simple:jopt-simple:3.2' diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index aa8d940..8071239 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -507,6 +507,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // The child change watchers will be set inside rebalance when we read the children list. } + @throws(classOf[RuntimeException]) + override def handleSessionEstablishmentError(error: Throwable): Unit = { + throw new RuntimeException("zkClient reconnection failed, is zookeeper having issues?", error) + } } class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index 38f4ec0..599f7f0 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -93,6 +93,11 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient, } } } + + @throws(classOf[RuntimeException]) + override def handleSessionEstablishmentError(error: Throwable): Unit = { + throw new RuntimeException("zkClient reconnection failed, is zookeeper having issues?", error) + } } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a635116..b1c6778 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1112,6 +1112,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt controllerElector.elect } } + + @throws(classOf[RuntimeException]) + override def handleSessionEstablishmentError(error: Throwable): Unit = { + throw new RuntimeException("zkClient reconnection failed, is zookeeper having issues?", error) + } } private def checkAndTriggerPartitionRebalance(): Unit = { diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 861b7f6..ef65632 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -89,6 +89,11 @@ class KafkaHealthcheck(private val brokerId: Int, info("done re-registering broker") info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) } + + @throws(classOf[RuntimeException]) + override def handleSessionEstablishmentError(error: Throwable): Unit = { + throw new RuntimeException("zkClient reconnection failed, is zookeeper having issues?", error) + } } } -- 1.9.5 (Apple Git-50.3)