diff --git a/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala b/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala index bdd8b57..581436e 100644 --- a/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala +++ b/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala @@ -25,7 +25,7 @@ import kafka.log.LogConfig import kafka.server.ConfigType import kafka.utils.{Logging, ZkUtils} import org.apache.zookeeper.KeeperException.Code -import org.apache.zookeeper.data.Stat +import org.apache.zookeeper.data.{ACL, Stat} import org.apache.zookeeper.{CreateMode, KeeperException} import scala.collection.mutable @@ -34,6 +34,24 @@ import scala.collection.mutable.ArrayBuffer class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean) extends Logging { import KafkaControllerZkUtils._ + class ZooKeeperClientWrapper(val zkClient: ZookeeperClient) { + def apply[T](method: ZookeeperClient => T): T = method(zkClient) + def close(): Unit = { + if (zkClient != null) + zkClient.close() + } + } + private val zkClientWrap = new ZooKeeperClientWrapper(zookeeperClient); + val zkPath = new ZkPath(zkClientWrap); + + private val UseDefaultAcls = new java.util.ArrayList[ACL] + + def createSequentialPersistentPath(path: String, data: String = "", acls: java.util.List[ACL] = UseDefaultAcls): String = { + val acl = if (acls eq UseDefaultAcls) ZkUtils.defaultAcls(isSecure, path) else acls + zkPath.createPersistentSequential(path, data, acl) + } + + /** * Gets topic partition states for the given partitions. * @param partitions the partitions for which we want ot get states. diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index eee442a..6f28baf 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -24,7 +24,7 @@ import java.util.concurrent.locks.Lock import com.yammer.metrics.core.Gauge import kafka.api._ import kafka.cluster.{BrokerEndPoint, Partition, Replica} -import kafka.controller.{KafkaController, StateChangeLogger} +import kafka.controller.{KafkaController, KafkaControllerZkUtils, StateChangeLogger} import kafka.log.{Log, LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} @@ -139,7 +139,7 @@ object ReplicaManager { class ReplicaManager(val config: KafkaConfig, metrics: Metrics, time: Time, - val zkUtils: ZkUtils, + val zkUtils: KafkaControllerZkUtils, scheduler: Scheduler, val logManager: LogManager, val isShuttingDown: AtomicBoolean, @@ -155,7 +155,7 @@ class ReplicaManager(val config: KafkaConfig, def this(config: KafkaConfig, metrics: Metrics, time: Time, - zkUtils: ZkUtils, + zkUtils: KafkaControllerZkUtils, scheduler: Scheduler, logManager: LogManager, isShuttingDown: AtomicBoolean, diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala index cc08055..8c08c2c 100644 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -18,7 +18,7 @@ package kafka.utils import kafka.api.LeaderAndIsr -import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch} +import kafka.controller.{IsrChangeNotificationListener, KafkaControllerZkUtils, LeaderIsrAndControllerEpoch} import kafka.utils.ZkUtils._ import org.apache.kafka.common.TopicPartition import org.apache.zookeeper.data.Stat @@ -39,7 +39,7 @@ object ReplicationUtils extends Logging { updatePersistentPath } - def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: Set[TopicPartition]): Unit = { + def propagateIsrChanges(zkUtils: KafkaControllerZkUtils, isrChangeSet: Set[TopicPartition]): Unit = { val isrChangeNotificationPath: String = zkUtils.createSequentialPersistentPath( ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix, generateIsrChangeJson(isrChangeSet))