From 01a94e88344dbdbff26d0dbfeefc4b348df99e6e Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Thu, 7 May 2015 13:15:55 -0700 Subject: [PATCH 1/2] KAFKA-2169: Moving to zkClient 0.5 release. --- build.gradle | 2 +- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 4 ++++ core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala | 5 +++++ core/src/main/scala/kafka/controller/KafkaController.scala | 5 +++++ core/src/main/scala/kafka/server/KafkaHealthcheck.scala | 5 +++++ 5 files changed, 20 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index fef515b..cd2aa83 100644 --- a/build.gradle +++ b/build.gradle @@ -207,7 +207,7 @@ project(':core') { compile project(':clients') compile "org.scala-lang:scala-library:$scalaVersion" compile 'org.apache.zookeeper:zookeeper:3.4.6' - compile 'com.101tec:zkclient:0.3' + compile 'com.101tec:zkclient:0.5' compile 'com.yammer.metrics:metrics-core:2.2.0' compile 'net.sf.jopt-simple:jopt-simple:3.2' diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index aa8d940..8071239 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -507,6 +507,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, // The child change watchers will be set inside rebalance when we read the children list. } + @throws(classOf[RuntimeException]) + override def handleSessionEstablishmentError(error: Throwable): Unit = { + throw new RuntimeException("zkClient reconnection failed, is zookeeper having issues?", error) + } } class ZKTopicPartitionChangeListener(val loadBalancerListener: ZKRebalancerListener) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index 38f4ec0..599f7f0 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -93,6 +93,11 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient, } } } + + @throws(classOf[RuntimeException]) + override def handleSessionEstablishmentError(error: Throwable): Unit = { + throw new RuntimeException("zkClient reconnection failed, is zookeeper having issues?", error) + } } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a635116..b1c6778 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1112,6 +1112,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt controllerElector.elect } } + + @throws(classOf[RuntimeException]) + override def handleSessionEstablishmentError(error: Throwable): Unit = { + throw new RuntimeException("zkClient reconnection failed, is zookeeper having issues?", error) + } } private def checkAndTriggerPartitionRebalance(): Unit = { diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 861b7f6..ef65632 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -89,6 +89,11 @@ class KafkaHealthcheck(private val brokerId: Int, info("done re-registering broker") info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath)) } + + @throws(classOf[RuntimeException]) + override def handleSessionEstablishmentError(error: Throwable): Unit = { + throw new RuntimeException("zkClient reconnection failed, is zookeeper having issues?", error) + } } } -- 2.1.3.36.g8e36a6d From 9ad6d585865c5f326dab733fbca1277af4248e01 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Mon, 11 May 2015 11:32:52 -0700 Subject: [PATCH 2/2] System.exit instead of throwing RuntimeException when zokeeper session establishment fails. --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala | 3 ++- core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala | 3 ++- core/src/main/scala/kafka/controller/KafkaController.scala | 3 ++- core/src/main/scala/kafka/server/KafkaHealthcheck.scala | 3 ++- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 8071239..ae3ffa5 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -509,7 +509,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, @throws(classOf[RuntimeException]) override def handleSessionEstablishmentError(error: Throwable): Unit = { - throw new RuntimeException("zkClient reconnection failed, is zookeeper having issues?", error) + fatal("Could not establish session with zookeeper", error) + System.exit(-1) } } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index 599f7f0..599b91c 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -96,7 +96,8 @@ class ZookeeperTopicEventWatcher(val zkClient: ZkClient, @throws(classOf[RuntimeException]) override def handleSessionEstablishmentError(error: Throwable): Unit = { - throw new RuntimeException("zkClient reconnection failed, is zookeeper having issues?", error) + fatal("Could not establish session with zookeeper", error) + System.exit(-1) } } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b1c6778..810ea30 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1115,7 +1115,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt @throws(classOf[RuntimeException]) override def handleSessionEstablishmentError(error: Throwable): Unit = { - throw new RuntimeException("zkClient reconnection failed, is zookeeper having issues?", error) + fatal("Could not establish session with zookeeper", error) + System.exit(-1) } } diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index ef65632..6596d86 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -92,7 +92,8 @@ class KafkaHealthcheck(private val brokerId: Int, @throws(classOf[RuntimeException]) override def handleSessionEstablishmentError(error: Throwable): Unit = { - throw new RuntimeException("zkClient reconnection failed, is zookeeper having issues?", error) + fatal("Could not establish session with zookeeper", error) + System.exit(-1) } } -- 2.1.3.36.g8e36a6d