diff --git a/core/build.sbt b/core/build.sbt index 405ea55..4f4bf49 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -12,7 +12,7 @@ libraryDependencies <+= scalaVersion("org.scala-lang" % "scala-compiler" % _ ) libraryDependencies ++= Seq( "org.apache.zookeeper" % "zookeeper" % "3.3.4", - "com.101tec" % "zkclient" % "0.2", + "com.github.sgroschupf" % "zkclient" % "0.1", "org.xerial.snappy" % "snappy-java" % "1.0.4.1", "com.yammer.metrics" % "metrics-core" % "2.2.0", "com.yammer.metrics" % "metrics-annotation" % "2.2.0", diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 41cb764..d39cbec 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -19,7 +19,7 @@ package kafka.admin import java.util.Random import kafka.utils.{Logging, ZkUtils} -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import scala.collection._ import scala.collection.mutable diff --git a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala index 9e8ccc3..23262d8 100644 --- a/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala +++ b/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala @@ -2,7 +2,7 @@ package kafka.admin import joptsimple.OptionParser -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.utils._ import scala.collection.Map import kafka.common.TopicAndPartition diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala index e762115..45508bc 100644 --- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala @@ -19,7 +19,7 @@ package kafka.admin import joptsimple.OptionParser import kafka.utils._ -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import scala.collection.mutable import kafka.common.Topic diff --git a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala index 3da4518..240266e 100644 --- a/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/DeleteTopicCommand.scala @@ -18,7 +18,7 @@ package kafka.admin import joptsimple.OptionParser -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.utils.{Utils, ZKStringSerializer, ZkUtils} object DeleteTopicCommand { diff --git a/core/src/main/scala/kafka/admin/ListTopicCommand.scala b/core/src/main/scala/kafka/admin/ListTopicCommand.scala index c760cc0..3553b7e 100644 --- a/core/src/main/scala/kafka/admin/ListTopicCommand.scala +++ b/core/src/main/scala/kafka/admin/ListTopicCommand.scala @@ -18,7 +18,7 @@ package kafka.admin import joptsimple.OptionParser -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.utils.{Utils, ZKStringSerializer, ZkUtils} object ListTopicCommand { diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index d5de5f3..3c5e732 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -18,7 +18,7 @@ package kafka.admin import joptsimple.OptionParser import kafka.utils._ -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import kafka.common.{TopicAndPartition, AdminCommandFailedException} import collection._ diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 8d287f4..89c0e7c 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -18,7 +18,7 @@ package kafka.admin import joptsimple.OptionParser import kafka.utils._ -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import kafka.common.{TopicAndPartition, AdminCommandFailedException} diff --git a/core/src/main/scala/kafka/admin/ShutdownBroker.scala b/core/src/main/scala/kafka/admin/ShutdownBroker.scala index bb20edb..ec4e45b 100644 --- a/core/src/main/scala/kafka/admin/ShutdownBroker.scala +++ b/core/src/main/scala/kafka/admin/ShutdownBroker.scala @@ -20,7 +20,7 @@ package kafka.admin import joptsimple.OptionParser import kafka.utils._ -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import javax.management.remote.{JMXServiceURL, JMXConnectorFactory} import javax.management.ObjectName import kafka.controller.KafkaController diff --git a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala b/core/src/main/scala/kafka/common/KafkaZookeperClient.scala index bace1d2..59e19be 100644 --- a/core/src/main/scala/kafka/common/KafkaZookeperClient.scala +++ b/core/src/main/scala/kafka/common/KafkaZookeperClient.scala @@ -17,7 +17,7 @@ package kafka.common -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.utils.{ZKStringSerializer, ZKConfig} import java.util.concurrent.atomic.AtomicReference diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 89ff382..dcbdfc8 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -18,7 +18,7 @@ package kafka.consumer import scala.collection.JavaConversions._ -import org.I0Itec.zkclient._ +import kafka.zookeeper.ZkClient import joptsimple._ import java.util.Properties import java.util.Random diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index db104f1..c5edcff 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -17,7 +17,7 @@ package kafka.consumer -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.server.{AbstractFetcherThread, AbstractFetcherManager} import kafka.cluster.{Cluster, Broker} import scala.collection.immutable diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index c8e8406..6fe0d65 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -18,7 +18,7 @@ package kafka.consumer import scala.collection._ -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.utils.{Json, ZKGroupDirs, ZkUtils, Logging} import kafka.common.KafkaException diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index e66680b..c832e23 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -25,7 +25,8 @@ import kafka.cluster._ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException import java.net.InetAddress -import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} +import kafka.zookeeper.ZkClient +import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener} import org.apache.zookeeper.Watcher.Event.KeeperState import java.util.UUID import kafka.serializer._ diff --git a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala index df83baa..c717fe6 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala @@ -19,7 +19,8 @@ package kafka.consumer import scala.collection.JavaConversions._ import kafka.utils.{ZkUtils, ZKStringSerializer, Logging} -import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} +import kafka.zookeeper.ZkClient +import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener} import org.apache.zookeeper.Watcher.Event.KeeperState class ZookeeperTopicEventWatcher(val config:ConsumerConfig, diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index f334685..b84aae2 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -30,7 +30,8 @@ import kafka.server.{ZookeeperLeaderElector, KafkaConfig} import kafka.utils.ZkUtils._ import kafka.utils.{Utils, ZkUtils, Logging} import org.apache.zookeeper.Watcher.Event.KeeperState -import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} +import kafka.zookeeper.ZkClient +import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener} import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger import scala.Some diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 93e2f04..271c111 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -25,7 +25,7 @@ import scala.collection._ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic._ import kafka.metrics.KafkaMetricsGroup -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.common._ import kafka.utils.{ZkUtils, Pool, SystemTime, Logging} import kafka.network.RequestChannel.Response diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b0348bb..8c60c9a 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -22,7 +22,7 @@ import kafka.log.LogManager import kafka.utils._ import java.util.concurrent._ import atomic.AtomicBoolean -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.controller.{ControllerStats, KafkaController} /** diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index 0e6c656..4a4e5c4 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -19,7 +19,8 @@ package kafka.server import kafka.utils._ import org.apache.zookeeper.Watcher.Event.KeeperState -import org.I0Itec.zkclient.{IZkStateListener, ZkClient} +import kafka.zookeeper.ZkClient +import org.I0Itec.zkclient.IZkStateListener import kafka.common._ import java.net.InetAddress diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8e49b83..9c4f179 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.cluster.{Broker, Partition, Replica} import collection._ import mutable.HashMap -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import java.util.concurrent.atomic.AtomicBoolean import kafka.utils._ import kafka.log.LogManager diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 0e6d9b8..417c89b 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -19,7 +19,7 @@ package kafka.tools import joptsimple._ -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.utils.{Json, ZkUtils, ZKStringSerializer, Logging} import kafka.consumer.SimpleConsumer import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala index 005231f..3abff96 100644 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala @@ -20,7 +20,7 @@ package kafka.tools import java.io.FileWriter import joptsimple._ import kafka.utils.{Logging, ZkUtils, ZKStringSerializer,ZKGroupTopicDirs} -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient /** diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index 63519e1..f5832f9 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -21,7 +21,7 @@ import java.io.BufferedReader import java.io.FileReader import joptsimple._ import kafka.utils.{Logging, ZkUtils,ZKStringSerializer} -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient /** diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala index 111c9a8..d639354 100644 --- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala +++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala @@ -17,7 +17,7 @@ package kafka.tools -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.consumer.{SimpleConsumer, ConsumerConfig} import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} import kafka.common.{TopicAndPartition, KafkaException} diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala index eac9af2..df15e52 100644 --- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala +++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala @@ -18,7 +18,7 @@ package kafka.tools import joptsimple.OptionParser -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer} object VerifyConsumerRebalance extends Logging { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 4f6fcd4..d0a078a 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -19,7 +19,8 @@ package kafka.utils import kafka.cluster.{Broker, Cluster} import kafka.consumer.TopicCount -import org.I0Itec.zkclient.{IZkDataListener, ZkClient} +import kafka.zookeeper.ZkClient +import org.I0Itec.zkclient.IZkDataListener import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} import org.I0Itec.zkclient.serialize.ZkSerializer import collection._ @@ -298,22 +299,17 @@ object ZkUtils extends Logging { * create parrent directory if necessary. Never throw NodeExistException. * Return the updated path zkVersion */ - def updatePersistentPath(client: ZkClient, path: String, data: String): Int = { - var stat: Stat = null + def updatePersistentPath(client: ZkClient, path: String, data: String) = { try { - stat = client.writeData(path, data) - return stat.getVersion + client.writeData(path, data) } catch { case e: ZkNoNodeException => { createParentPath(client, path) try { client.createPersistent(path, data) - // When the new path is created, its zkVersion always starts from 0 - return 0 } catch { case e: ZkNodeExistsException => - stat = client.writeData(path, data) - return stat.getVersion + client.writeData(path, data) case e2 => throw e2 } } @@ -327,7 +323,7 @@ object ZkUtils extends Logging { */ def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { try { - val stat = client.writeData(path, data, expectVersion) + val stat = client.writeDataReturnStat(path, data, expectVersion) debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) @@ -345,7 +341,7 @@ object ZkUtils extends Logging { */ def conditionalUpdatePersistentPathIfExists(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { try { - val stat = client.writeData(path, data, expectVersion) + val stat = client.writeDataReturnStat(path, data, expectVersion) debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) diff --git a/core/src/main/scala/kafka/zookeeper/ZkClient.java b/core/src/main/scala/kafka/zookeeper/ZkClient.java new file mode 100644 index 0000000..0c6e305 --- /dev/null +++ b/core/src/main/scala/kafka/zookeeper/ZkClient.java @@ -0,0 +1,35 @@ +package kafka.zookeeper; + +import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.zookeeper.data.Stat; +import java.util.concurrent.Callable; + +/** + * Since zkclient 0.2.0 is not backward compatible with 0.1.0, this is a temporary solution to + * expose some zookeeper api until a backward compatible zkclient version is released. + */ +public class ZkClient extends org.I0Itec.zkclient.ZkClient { + private ZkSerializer _zkSerializer; + + public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, + ZkSerializer zkSerializer) + { + super(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer); + _zkSerializer = zkSerializer; + } + + public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion) { + final byte[] data = _zkSerializer.serialize(datat); + return (Stat) retryUntilConnected(new Callable() { + + @Override + public Object call() throws Exception { + Stat stat = ((ZkConnection) _connection).getZookeeper().setData(path, data, expectedVersion); + return stat; + } + }); + } + +} + diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala index 2554503..e78838a 100644 --- a/core/src/test/scala/other/kafka/DeleteZKPath.scala +++ b/core/src/test/scala/other/kafka/DeleteZKPath.scala @@ -19,7 +19,7 @@ package kafka import consumer.ConsumerConfig import utils.{ZKStringSerializer, ZkUtils, Utils} -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient object DeleteZKPath { def main(args: Array[String]) { diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index fcfc583..0e1242a 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -26,7 +26,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.message._ import kafka.serializer._ import kafka.admin.CreateTopicCommand -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.utils._ import kafka.producer.{ProducerConfig, KeyedMessage, Producer} import java.util.{Collections, Properties} diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index f764151..7f032e7 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -23,7 +23,7 @@ import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder} import kafka.server.{KafkaRequestHandler, KafkaConfig} import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import org.apache.log4j.{Level, Logger} -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite import scala.collection._ diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index f30b097..db1d228 100644 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -17,7 +17,7 @@ package kafka.server import kafka.log.LogManager -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import org.scalatest.junit.JUnit3Suite import org.easymock.EasyMock import org.junit._ diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index c7dd8a7..015d99c 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -22,7 +22,7 @@ import kafka.message.{ByteBufferMessageSet, Message} import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.utils.{ZkUtils, Time, TestUtils, MockTime} import org.easymock.EasyMock -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import org.scalatest.junit.JUnit3Suite import kafka.api._ import scala.Some diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 3cb1d4a..0c580e7 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -27,7 +27,7 @@ import junit.framework.Assert._ import kafka.server._ import kafka.producer._ import kafka.message._ -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.cluster.Broker import collection.mutable.ListBuffer import kafka.consumer.ConsumerConfig diff --git a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala index 85eec6f..933bb49 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala @@ -18,7 +18,7 @@ package kafka.zk import kafka.consumer.ConsumerConfig -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.utils.{ZkUtils, ZKStringSerializer} import kafka.utils.TestUtils import org.junit.Assert diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 63e528f..73ed628 100644 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -18,7 +18,7 @@ package kafka.zk import org.scalatest.junit.JUnit3Suite -import org.I0Itec.zkclient.ZkClient +import kafka.zookeeper.ZkClient import kafka.utils.{ZKStringSerializer, TestZKUtils} trait ZooKeeperTestHarness extends JUnit3Suite {