From 4ba9ed9c8da60bc2431d3f5f05c179d080b5432d Mon Sep 17 00:00:00 2001 From: Vivek Madani Date: Wed, 20 May 2015 16:02:46 -0700 Subject: [PATCH] Fix for KAFKA-1737. Added a wrapper around creation of ZkClient to ensure ZkStringSerializer is set correctly --- .../scala/kafka/admin/ConsumerGroupCommand.scala | 2 +- .../PreferredReplicaLeaderElectionCommand.scala | 2 +- .../kafka/admin/ReassignPartitionsCommand.scala | 2 +- core/src/main/scala/kafka/admin/TopicCommand.scala | 2 +- .../consumer/ZookeeperConsumerConnector.scala | 2 +- .../consumer/ZookeeperTopicEventWatcher.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 4 +-- .../main/scala/kafka/tools/ConsoleConsumer.scala | 2 +- .../scala/kafka/tools/ConsumerOffsetChecker.scala | 2 +- .../main/scala/kafka/tools/ExportZkOffsets.scala | 4 +-- .../main/scala/kafka/tools/ImportZkOffsets.scala | 4 +-- .../main/scala/kafka/tools/UpdateOffsetsInZK.scala | 6 ++-- .../kafka/tools/VerifyConsumerRebalance.scala | 4 +-- core/src/main/scala/kafka/utils/ZkUtils.scala | 9 ++++-- core/src/test/scala/other/kafka/DeleteZKPath.scala | 5 ++-- .../test/scala/other/kafka/TestOffsetManager.scala | 4 +-- .../consumer/ZookeeperConsumerConnectorTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 2 +- .../test/scala/unit/kafka/zk/ZKEphemeralTest.scala | 8 ++--- core/src/test/scala/unit/kafka/zk/ZKPathTest.scala | 34 +++++++++------------- .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 4 +-- 21 files changed, 50 insertions(+), 56 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 1c3b380..6d1c6ab 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -48,7 +48,7 @@ object ConsumerGroupCommand { opts.checkArgs() - val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000) try { if (opts.options.has(opts.listOpt)) diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 3b3cd67..2aa6e62 100755 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -53,7 +53,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { var zkClient: ZkClient = null try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000) val partitionsForPreferredReplicaElection = if (!options.has(jsonFileOpt)) ZkUtils.getAllPartitions(zkClient) diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index acaa611..912b718 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -38,7 +38,7 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) val zkConnect = opts.options.valueOf(opts.zkConnectOpt) - var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + var zkClient: ZkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000) try { if(opts.options.has(opts.verifyOpt)) verifyAssignment(zkClient, opts) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 8e6f186..dacbdd0 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -47,7 +47,7 @@ object TopicCommand { opts.checkArgs() - val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000) try { if(opts.options.has(opts.createOpt)) diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 0b0dca1..a7f2acc 100755 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -178,7 +178,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private def connectZk() { info("Connecting to zookeeper instance at " + config.zkConnect) - zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) } // Blocks until the offset manager is located and a channel is established to it. diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index f2fa36f..f74823b 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -18,7 +18,7 @@ package kafka.consumer import scala.collection.JavaConversions._ -import kafka.utils.{ZkUtils, ZKStringSerializer, Logging} +import kafka.utils.{ZkUtils, Logging} import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index ea6d165..e66710d 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -196,13 +196,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if (chroot.length > 1) { val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/")) - val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClientForChrootCreation = ZkUtils.createZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot) info("Created zookeeper path " + chroot) zkClientForChrootCreation.close() } - val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) ZkUtils.setupCommonPaths(zkClient) zkClient } diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index bba3990..a3bee58 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -209,7 +209,7 @@ object ConsoleConsumer extends Logging { def checkZkPathExists(zkUrl: String, path: String): Boolean = { try { - val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer) + val zk = ZkUtils.createZkClient(zkUrl, 30*1000,30*1000); zk.exists(path) } catch { case _: Throwable => false diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index d2bac85..ad64cee 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -149,7 +149,7 @@ object ConsumerOffsetChecker extends Logging { var zkClient: ZkClient = null var channel: BlockingChannel = null try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000) val topicList = topics match { case Some(x) => x.split(",").view.toList diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala index ce14bbc..7b52fe4 100644 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala @@ -19,7 +19,7 @@ package kafka.tools import java.io.FileWriter import joptsimple._ -import kafka.utils.{Logging, ZkUtils, ZKStringSerializer, ZKGroupTopicDirs, CommandLineUtils} +import kafka.utils.{Logging, ZkUtils, ZKGroupTopicDirs, CommandLineUtils} import org.I0Itec.zkclient.ZkClient @@ -76,7 +76,7 @@ object ExportZkOffsets extends Logging { val fileWriter : FileWriter = new FileWriter(outfile) try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000) var consumerGroups: Seq[String] = null diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index 598350d..b56f587 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -20,7 +20,7 @@ package kafka.tools import java.io.BufferedReader import java.io.FileReader import joptsimple._ -import kafka.utils.{Logging, ZkUtils,ZKStringSerializer, CommandLineUtils} +import kafka.utils.{Logging, ZkUtils, CommandLineUtils} import org.I0Itec.zkclient.ZkClient @@ -68,7 +68,7 @@ object ImportZkOffsets extends Logging { val zkConnect = options.valueOf(zkConnectOpt) val partitionOffsetFile = options.valueOf(inFileOpt) - val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000) val partitionOffsets: Map[String,String] = getPartitionOffsetsFromFile(partitionOffsetFile) updateZkOffsets(zkClient, partitionOffsets) diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala index 9235ed9..9942686 100755 --- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala +++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala @@ -21,7 +21,7 @@ import org.I0Itec.zkclient.ZkClient import kafka.consumer.{SimpleConsumer, ConsumerConfig} import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} import kafka.common.{TopicAndPartition, KafkaException} -import kafka.utils.{ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, CoreUtils} +import kafka.utils.{ZKGroupTopicDirs, ZkUtils, CoreUtils} import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.utils.Utils @@ -36,8 +36,8 @@ object UpdateOffsetsInZK { if(args.length < 3) usage val config = new ConsumerConfig(Utils.loadProps(args(1))) - val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, + config.zkConnectionTimeoutMs) args(0) match { case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2)) case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2)) diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala index 4fb519b..db2721f 100644 --- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala +++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala @@ -19,7 +19,7 @@ package kafka.tools import joptsimple.OptionParser import org.I0Itec.zkclient.ZkClient -import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, CommandLineUtils} +import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, CommandLineUtils} object VerifyConsumerRebalance extends Logging { def main(args: Array[String]) { @@ -48,7 +48,7 @@ object VerifyConsumerRebalance extends Logging { var zkClient: ZkClient = null try { - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zkConnect, 30000, 30000) debug("zkConnect = %s; group = %s".format(zkConnect, group)) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 2618dd3..78475e3 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -460,7 +460,7 @@ object ZkUtils extends Logging { def maybeDeletePath(zkUrl: String, dir: String) { try { - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + val zk = createZkClient(zkUrl, 30*1000, 30*1000) zk.deleteRecursive(dir) zk.close() } catch { @@ -781,9 +781,14 @@ object ZkUtils extends Logging { } } } + + def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = { + val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer) + zkClient + } } -object ZKStringSerializer extends ZkSerializer { +private object ZKStringSerializer extends ZkSerializer { @throws(classOf[ZkMarshallingError]) def serialize(data : Object) : Array[Byte] = data.asInstanceOf[String].getBytes("UTF-8") diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala index 33c3ef8..fb8ab9f 100755 --- a/core/src/test/scala/other/kafka/DeleteZKPath.scala +++ b/core/src/test/scala/other/kafka/DeleteZKPath.scala @@ -18,7 +18,7 @@ package kafka import consumer.ConsumerConfig -import utils.{ZKStringSerializer, ZkUtils} +import utils.ZkUtils import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.utils.Utils @@ -32,8 +32,7 @@ object DeleteZKPath { val config = new ConsumerConfig(Utils.loadProps(args(0))) val zkPath = args(1) - val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) try { ZkUtils.deletePathRecursive(zkClient, zkPath); diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index 9881bd3..c483132 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -2,7 +2,7 @@ package other.kafka import org.I0Itec.zkclient.ZkClient import kafka.api._ -import kafka.utils.{ShutdownableThread, ZKStringSerializer} +import kafka.utils.ShutdownableThread import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection._ import kafka.client.ClientUtils @@ -238,7 +238,7 @@ object TestOffsetManager { var fetchThread: FetchThread = null var statsThread: StatsThread = null try { - zkClient = new ZkClient(zookeeper, 6000, 2000, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zookeeper, 6000, 2000) commitThreads = (0 to (threadCount-1)).map { threadId => new CommitThread(threadId, partitionCount, commitIntervalMs, zkClient) } diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 7f9fca3..359b0f5 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -316,7 +316,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar } def testLeaderSelectionForPartition() { - val zkClient = new ZkClient(zkConnect, 6000, 30000, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(zkConnect, 6000, 30000) // create topic topic1 with 1 partition on broker 0 createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index faae0e9..17e9fe4 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -448,7 +448,7 @@ object TestUtils extends Logging { } def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = { - val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) + val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs) ZkUtils.updatePersistentPath(zkClient, path, offset.toString) } diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala index 85eec6f..2be1619 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala @@ -19,7 +19,7 @@ package kafka.zk import kafka.consumer.ConsumerConfig import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZkUtils, ZKStringSerializer} +import kafka.utils.ZkUtils import kafka.utils.TestUtils import org.junit.Assert import org.scalatest.junit.JUnit3Suite @@ -29,8 +29,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness { def testEphemeralNodeCleanup = { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) - var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer) + var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) try { ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created") @@ -42,8 +41,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness { testData = ZkUtils.readData(zkClient, "/tmp/zktest")._1 Assert.assertNotNull(testData) zkClient.close - zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest") Assert.assertFalse(nodeExists) } diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala index a2d062f..64c3ba2 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala @@ -19,7 +19,7 @@ package unit.kafka.zk import junit.framework.Assert import kafka.consumer.ConsumerConfig -import kafka.utils.{ZkPath, TestUtils, ZKStringSerializer, ZkUtils} +import kafka.utils.{ZkPath, TestUtils, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.config.ConfigException @@ -34,9 +34,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { def testCreatePersistentPathThrowsException { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) - var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, - ZKStringSerializer) + var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs, + config.zkConnectionTimeoutMs) try { ZkPath.resetNamespaceCheckedState ZkUtils.createPersistentPath(zkClient, path) @@ -49,8 +48,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { def testCreatePersistentPath { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) - var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer) + var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) try { ZkPath.resetNamespaceCheckedState ZkUtils.createPersistentPath(zkClient, path) @@ -64,9 +62,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { def testMakeSurePersistsPathExistsThrowsException { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) - var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, - ZKStringSerializer) + var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs, + config.zkConnectionTimeoutMs) try { ZkPath.resetNamespaceCheckedState ZkUtils.makeSurePersistentPathExists(zkClient, path) @@ -79,8 +76,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { def testMakeSurePersistsPathExists { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) - var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer) + var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) try { ZkPath.resetNamespaceCheckedState ZkUtils.makeSurePersistentPathExists(zkClient, path) @@ -94,9 +90,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { def testCreateEphemeralPathThrowsException { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) - var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, - ZKStringSerializer) + var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs, + config.zkConnectionTimeoutMs) try { ZkPath.resetNamespaceCheckedState ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata") @@ -109,8 +104,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { def testCreateEphemeralPathExists { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) - var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer) + var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) try { ZkPath.resetNamespaceCheckedState ZkUtils.createEphemeralPathExpectConflict(zkClient, path, "somedata") @@ -124,9 +118,8 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { def testCreatePersistentSequentialThrowsException { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnectWithInvalidRoot, "test", "1")) - var zkClient = new ZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs, - config.zkConnectionTimeoutMs, - ZKStringSerializer) + var zkClient = ZkUtils.createZkClient(zkConnectWithInvalidRoot, zkSessionTimeoutMs, + config.zkConnectionTimeoutMs) try { ZkPath.resetNamespaceCheckedState ZkUtils.createSequentialPersistentPath(zkClient, path) @@ -139,8 +132,7 @@ class ZKPathTest extends JUnit3Suite with ZooKeeperTestHarness { def testCreatePersistentSequentialExists { val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1")) - var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs, - ZKStringSerializer) + var zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs) var actualPath: String = "" try { diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 86bddea..09bd375 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -19,7 +19,7 @@ package kafka.zk import org.scalatest.junit.JUnit3Suite import org.I0Itec.zkclient.ZkClient -import kafka.utils.{ZKStringSerializer, CoreUtils} +import kafka.utils.CoreUtils trait ZooKeeperTestHarness extends JUnit3Suite { var zkPort: Int = -1 @@ -34,7 +34,7 @@ trait ZooKeeperTestHarness extends JUnit3Suite { super.setUp zookeeper = new EmbeddedZookeeper() zkPort = zookeeper.port - zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) + zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout) } override def tearDown() { -- 1.9.5 (Apple Git-50.3)