From b0d25f8b45316706a73791eec6433e0195fa9e3a Mon Sep 17 00:00:00 2001 From: Vivek Madani Date: Mon, 1 Dec 2014 17:28:18 +0530 Subject: [PATCH] Kafka-1737; Added createZkClient in ZkUtils to create a ZkClient instance with its serializer set to ZkStringSerializer. Also replaced all instances of "new ZkClient" with ZkUtils.createZkClient --- .../PreferredReplicaLeaderElectionCommand.scala | 2 +- .../kafka/admin/ReassignPartitionsCommand.scala | 2 +- core/src/main/scala/kafka/admin/TopicCommand.scala | 4 ++-- .../consumer/ZookeeperConsumerConnector.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 4 ++-- .../main/scala/kafka/tools/ConsoleConsumer.scala | 2 +- .../scala/kafka/tools/ConsumerOffsetChecker.scala | 2 +- .../main/scala/kafka/tools/ExportZkOffsets.scala | 2 +- .../main/scala/kafka/tools/ImportZkOffsets.scala | 2 +- .../main/scala/kafka/tools/UpdateOffsetsInZK.scala | 4 ++-- .../kafka/tools/VerifyConsumerRebalance.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 7 ++++++- core/src/test/scala/other/kafka/DeleteZKPath.scala | 3 +-- .../test/scala/other/kafka/TestOffsetManager.scala | 5 ++--- .../consumer/ZookeeperConsumerConnectorTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 2 +- .../test/scala/unit/kafka/zk/ZKEphemeralTest.scala | 6 ++---- .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 4 ++-- 18 files changed, 29 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 79b5e0a..3f91765 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -53,7 +53,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { var zkClient: ZkClient = null try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000) val partitionsForPreferredReplicaElection = if (!options.has(jsonFileOpt)) ZkUtils.getAllPartitions(zkClient) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 979992b..ec36324 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -37,7 +37,7 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) val zkConnect = opts.options.valueOf(opts.zkConnectOpt) - var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + var zkClient: ZkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000) try { if(opts.options.has(opts.verifyOpt)) verifyAssignment(zkClient, opts) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 285c033..8d306c5 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -48,8 +48,8 @@ object TopicCommand { opts.checkArgs() - val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) - + val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000) + try { if(opts.options.has(opts.createOpt)) createTopic(zkClient, opts) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 3e1718b..6265e49 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -168,7 +168,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def connectZk() { info("Connecting to zookeeper instance at " + config.zkConnect) - zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) } // Blocks until the offset manager is located and a channel is established to it. diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 1bf7d10..5cd9588 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -147,13 +147,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if (chroot.length > 1) { val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/")) - val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClientForChrootCreation = ZkUtils.createZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot) info("Created zookeeper path " + chroot) zkClientForChrootCreation.close() } - val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) ZkUtils.setupCommonPaths(zkClient) zkClient } diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 323fc85..f57cef3 100644 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -209,7 +209,7 @@ object ConsoleConsumer extends Logging { def checkZkPathExists(zkUrl: String, path: String): Boolean = { try { - val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); + val zk = ZkUtils.createZkClient(zkUrl, 30*1000,30*1000) zk.exists(path) } catch { case _: Throwable => false diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d1e7c43..1d0c102 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -149,7 +149,7 @@ object ConsumerOffsetChecker extends Logging { var zkClient: ZkClient = null var channel: BlockingChannel = null try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000) val topicList = topics match { case Some(x) => x.split(",").view.toList diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala index 4d051bc..f9fb8f4 100644 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala @@ -76,7 +76,7 @@ object ExportZkOffsets extends Logging { val fileWriter : FileWriter = new FileWriter(outfile) try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000) var consumerGroups: Seq[String] = null diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index abe0972..19ad5c6 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -68,7 +68,7 @@ object ImportZkOffsets extends Logging { val zkConnect = options.valueOf(zkConnectOpt) val partitionOffsetFile = options.valueOf(inFileOpt) - val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000) val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile) updateZkOffsets(zkClient, partitionOffsets) diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala index 111c9a8..7207840 100644 --- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala +++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala @@ -35,8 +35,8 @@ object UpdateOffsetsInZK { if(args.length < 3) usage val config = new ConsumerConfig(Utils.loadProps(args(1))) - val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, + config.zkConnectionTimeoutMs) args(0) match { case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2)) case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2)) diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala index aef8361..fdad2e9 100644 --- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala +++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala @@ -48,7 +48,7 @@ object VerifyConsumerRebalance extends Logging { var zkClient: ZkClient = null try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000) debug("zkConnect = %s; group = %s".format(zkConnect, group)) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 56e3e88..10be59f 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -436,7 +436,7 @@ object ZkUtils extends Logging { def maybeDeletePath(zkUrl: String, dir: String) { try { - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + val zk = createZkClient(zkUrl, 30*1000, 30*1000) zk.deleteRecursive(dir) zk.close() } catch { @@ -713,6 +713,11 @@ object ZkUtils extends Logging { }.flatten.toSet } } + + def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = { + val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer) + zkClient + } } object ZKStringSerializer extends ZkSerializer { diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala index 2554503..519a114 100644 --- a/core/src/test/scala/other/kafka/DeleteZKPath.scala +++ b/core/src/test/scala/other/kafka/DeleteZKPath.scala @@ -31,8 +31,7 @@ object DeleteZKPath { val config = new ConsumerConfig(Utils.loadProps(args(0))) val zkPath = args(1) - val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) try { ZkUtils.deletePathRecursive(zkClient, zkPath); diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index 41f334d..861ab01 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -2,7 +2,7 @@ package other.kafka import org.I0Itec.zkclient.ZkClient import kafka.api._ -import kafka.utils.{ShutdownableThread, ZKStringSerializer} +import kafka.utils.{ShutdownableThread, ZKStringSerializer, ZkUtils} import scala.collection._ import kafka.client.ClientUtils import joptsimple.OptionParser @@ -16,7 +16,6 @@ import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicInteger import java.nio.channels.ClosedByInterruptException - object TestOffsetManager { val random = new Random @@ -238,7 +237,7 @@ object TestOffsetManager { var fetchThread: FetchThread = null var statsThread: StatsThread = null try { - zkClient = new ZkClient(zookeeper, 6000, 2000, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zookeeper, 6000, 2000) commitThreads = (0 to (threadCount-1)).map { threadId => new CommitThread(threadId, partitionCount, commitIntervalMs, zkClient) } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 8c4687b..96db5e2 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -314,7 +314,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } def testLeaderSelectionForPartition() { - val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(zookeeperConnect, 6000, 30000) // create topic topic1 with 1 partition on broker 0 createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 0da774d..b39538d 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -426,7 +426,7 @@ object TestUtils extends Logging { } def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = { - val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) ZkUtils.updatePersistentPath(zkClient, path, offset.toString) } diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala index 85eec6f..49921fa 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala @@ -29,8 +29,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness { def testEphemeralNodeCleanup = { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) - var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer) + var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) try { ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created") @@ -42,8 +41,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness { testData = ZkUtils.readData(zkClient, "/tmp/zktest")._1 Assert.assertNotNull(testData) zkClient.close - zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest") Assert.assertFalse(nodeExists) } diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 67d9c4b..948900e 100644 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -19,7 +19,7 @@ package kafka.zk import org.scalatest.junit.JUnit3Suite import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils} +import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils, ZkUtils} trait ZooKeeperTestHarness extends JUnit3Suite { val zkConnect: String = TestZKUtils.zookeeperConnect @@ -31,7 +31,7 @@ trait ZooKeeperTestHarness extends JUnit3Suite { override def setUp() { super.setUp zookeeper = new EmbeddedZookeeper(zkConnect) - zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout) } override def tearDown() { -- 1.7.1