diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 32cab2a..8c2b45b 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -21,6 +21,7 @@ import kafka.log.LogConfig import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig} import kafka.utils._ import kafka.utils.ZkUtils._ +import kafka.zk.KafkaZkClient import java.util.Random import java.util.Properties @@ -57,7 +58,8 @@ trait AdminUtilities { } } - def fetchEntityConfig(zkUtils: ZkUtils,entityType: String, entityName: String): Properties + def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties + def fetchEntityConfigZkClient(zkClient: KafkaZkClient, entityType: String, entityName: String): Properties } object AdminUtils extends Logging with AdminUtilities { @@ -657,6 +659,30 @@ object AdminUtils extends Logging with AdminUtilities { props } + /** + * Read the entity (topic, broker, client, user or ) config (if any) from zk + * sanitizedEntityName is , , , or /clients/. + */ + def fetchEntityConfigZkClient(zkClient: KafkaZkClient, rootEntityType: String, sanitizedEntityName: String): Properties = { + val entityConfigPath = getEntityConfigPath(rootEntityType, sanitizedEntityName) + // readDataMaybeNull returns Some(null) if the path exists, but there is no data + val str = zkClient.readDataMaybeNull(entityConfigPath)._1.orNull + val props = new Properties() + if (str != null) { + Json.parseFull(str).foreach { jsValue => + val jsObject = jsValue.asJsonObjectOption.getOrElse { + throw new IllegalArgumentException(s"Unexpected value in config: $str, entity_config_path: $entityConfigPath") + } + require(jsObject("version").to[Int] == 1) + val config = jsObject.get("config").flatMap(_.asJsonObjectOption).getOrElse { + throw new IllegalArgumentException(s"Invalid $entityConfigPath config: $str") + } + config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) } + } + } + props + } + def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] = zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 634d2d5..8ec8c02 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -55,7 +55,7 @@ class Partition(val topic: String, // Do not use replicaManager if this partition is ReplicaManager.OfflinePartition private val localBrokerId = if (!isOffline) replicaManager.config.brokerId else -1 private val logManager = if (!isOffline) replicaManager.logManager else null - private val zkUtils = if (!isOffline) replicaManager.zkUtils else null + private val zkUtils = if (!isOffline) replicaManager.zkClient else null // allReplicasMap includes both assigned replicas and the future replica if there is ongoing replica movement private val allReplicasMap = new Pool[Int, Replica] // The read lock is only required when multiple reads are executed and needs to be in a consistent manner @@ -172,7 +172,7 @@ class Partition(val topic: String, allReplicasMap.getAndMaybePut(replicaId, { if (isReplicaLocal(replicaId)) { val config = LogConfig.fromProps(logManager.defaultConfig.originals, - AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)) + AdminUtils.fetchEntityConfigZkClient(zkUtils, ConfigType.Topic, topic)) val log = logManager.getOrCreateLog(topicPartition, config, isNew, replicaId == Request.FutureLocalReplicaId) val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent) val offsetMap = checkpoint.read() diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f8111ff..6886a03 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -332,7 +332,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP } protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = - new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers, + new ReplicaManager(config, metrics, time, zkClient, kafkaScheduler, logManager, isShuttingDown, quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel) private def initZk(): ZkUtils = { diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 3bc68da..7bd5f09 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -155,7 +155,7 @@ class ReplicaFetcherThread(name: String, // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. - if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, + if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfigZkClient(replicaMgr.zkClient, ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly. fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " + diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 101eaae..e534bea 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -30,6 +30,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils._ +import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic @@ -132,7 +133,7 @@ object ReplicaManager { class ReplicaManager(val config: KafkaConfig, metrics: Metrics, time: Time, - val zkUtils: ZkUtils, + val zkClient: KafkaZkClient, scheduler: Scheduler, val logManager: LogManager, val isShuttingDown: AtomicBoolean, @@ -148,7 +149,7 @@ class ReplicaManager(val config: KafkaConfig, def this(config: KafkaConfig, metrics: Metrics, time: Time, - zkUtils: ZkUtils, + zkClient: KafkaZkClient, scheduler: Scheduler, logManager: LogManager, isShuttingDown: AtomicBoolean, @@ -157,7 +158,7 @@ class ReplicaManager(val config: KafkaConfig, metadataCache: MetadataCache, logDirFailureChannel: LogDirFailureChannel, threadNamePrefix: Option[String] = None) { - this(config, metrics, time, zkUtils, scheduler, logManager, isShuttingDown, + this(config, metrics, time, zkClient, scheduler, logManager, isShuttingDown, quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel, DelayedOperationPurgatory[DelayedProduce]( purgatoryName = "Produce", brokerId = config.brokerId, @@ -265,7 +266,7 @@ class ReplicaManager(val config: KafkaConfig, if (isrChangeSet.nonEmpty && (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now || lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) { - ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet) + ReplicationUtils.propagateIsrChanges(zkClient, isrChangeSet) isrChangeSet.clear() lastIsrPropagationMs.set(now) } @@ -1444,7 +1445,7 @@ class ReplicaManager(val config: KafkaConfig, s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.") } logManager.handleLogDirFailure(dir) - LogDirUtils.propagateLogDirEvent(zkUtils, localBrokerId) + LogDirUtils.propagateLogDirEvent(zkClient, localBrokerId) info(s"Stopped serving replicas in dir $dir") } diff --git a/core/src/main/scala/kafka/utils/LogDirUtils.scala b/core/src/main/scala/kafka/utils/LogDirUtils.scala index 8457ce5..f32beef 100644 --- a/core/src/main/scala/kafka/utils/LogDirUtils.scala +++ b/core/src/main/scala/kafka/utils/LogDirUtils.scala @@ -18,6 +18,7 @@ package kafka.utils import kafka.controller.LogDirEventNotificationListener +import kafka.zk.KafkaZkClient import scala.collection.Map object LogDirUtils extends Logging { @@ -25,8 +26,8 @@ object LogDirUtils extends Logging { private val LogDirEventNotificationPrefix = "log_dir_event_" val LogDirFailureEvent = 1 - def propagateLogDirEvent(zkUtils: ZkUtils, brokerId: Int) { - val logDirEventNotificationPath: String = zkUtils.createSequentialPersistentPath( + def propagateLogDirEvent(zkClient: KafkaZkClient, brokerId: Int) { + val logDirEventNotificationPath: String = zkClient.createSequentialPersistentPath( ZkUtils.LogDirEventNotificationPath + "/" + LogDirEventNotificationPrefix, logDirFailureEventZkData(brokerId)) debug(s"Added $logDirEventNotificationPath for broker $brokerId") } diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala index cc08055..5ca970b 100644 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -20,6 +20,7 @@ package kafka.utils import kafka.api.LeaderAndIsr import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch} import kafka.utils.ZkUtils._ +import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.zookeeper.data.Stat @@ -29,26 +30,26 @@ object ReplicationUtils extends Logging { private val IsrChangeNotificationPrefix = "isr_change_" - def updateLeaderAndIsr(zkUtils: ZkUtils, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int, + def updateLeaderAndIsr(zkClient: KafkaZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int, zkVersion: Int): (Boolean,Int) = { debug(s"Updated ISR for $topic-$partitionId to ${newLeaderAndIsr.isr.mkString(",")}") val path = getTopicPartitionLeaderAndIsrPath(topic, partitionId) - val newLeaderData = zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch) + val newLeaderData = zkClient.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch) // use the epoch of the controller that made the leadership decision, instead of the current controller epoch - val updatePersistentPath: (Boolean, Int) = zkUtils.conditionalUpdatePersistentPath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData)) + val updatePersistentPath: (Boolean, Int) = zkClient.conditionalUpdatePath(path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData)) updatePersistentPath } - def propagateIsrChanges(zkUtils: ZkUtils, isrChangeSet: Set[TopicPartition]): Unit = { - val isrChangeNotificationPath: String = zkUtils.createSequentialPersistentPath( + def propagateIsrChanges(zkClient: KafkaZkClient, isrChangeSet: Set[TopicPartition]): Unit = { + val isrChangeNotificationPath: String = zkClient.createSequentialPersistentPath( ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix, generateIsrChangeJson(isrChangeSet)) debug(s"Added $isrChangeNotificationPath for $isrChangeSet") } - private def checkLeaderAndIsrZkData(zkUtils: ZkUtils, path: String, expectedLeaderAndIsrInfo: String): (Boolean, Int) = { + private def checkLeaderAndIsrZkData(zkClient: KafkaZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean, Int) = { try { - val writtenLeaderAndIsrInfo = zkUtils.readDataMaybeNull(path) + val writtenLeaderAndIsrInfo = zkClient.readDataMaybeNull(path) val writtenLeaderOpt = writtenLeaderAndIsrInfo._1 val writtenStat = writtenLeaderAndIsrInfo._2 val expectedLeader = parseLeaderAndIsr(expectedLeaderAndIsrInfo, path, writtenStat) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 026dc9d..841b67c 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -48,6 +48,29 @@ import scala.collection.mutable.ArrayBuffer class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends Logging { import KafkaZkClient._ + def createSequentialPersistentPath(path: String, data: String = ""): String = { + val createRequest = CreateRequest(path, data.getBytes("UTF-8"), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) + val createResponse = retryRequestUntilConnected(createRequest) + createResponse.path + } + + def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = { + Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch, + "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)) + } + + def readDataMaybeNull(path: String): (Option[String], Stat) = { + val getDataRequest = GetDataRequest(path) + val getDataResponse = retryRequestUntilConnected(getDataRequest) + + if (getDataResponse.resultCode == Code.NONODE) { + (None, getDataResponse.stat) + } else { + val data = new String(getDataResponse.data, UTF_8) + (Some(data), getDataResponse.stat) + } + } + /** * Gets topic partition states for the given partitions. * @param partitions the partitions for which we want ot get states. @@ -739,7 +762,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends } } - private[zk] def pathExists(path: String): Boolean = { + def pathExists(path: String): Boolean = { val getDataRequest = GetDataRequest(path) val getDataResponse = retryRequestUntilConnected(getDataRequest) getDataResponse.resultCode == Code.OK