diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 518d2df..a9c0465 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -18,7 +18,7 @@ package kafka.cluster import kafka.common._ import kafka.admin.AdminUtils -import kafka.utils.{ZkUtils, Pool, Time, Logging} +import kafka.utils.{ZkUtils, ReplicationUtils, Pool, Time, Logging} import kafka.utils.Utils.inLock import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig @@ -216,7 +216,7 @@ class Partition(val topic: String, inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion - + leaderReplicaIdOpt.foreach { leaderReplica => if (topic == OffsetManager.OffsetsTopicName && /* if we are making a leader->follower transition */ @@ -261,7 +261,15 @@ class Partition(val topic: String, info("Expanding ISR for partition [%s,%d] from %s to %s" .format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in ZK and cache - updateIsr(newInSyncReplicas) + val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic, partitionId, localBrokerId, + leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas) + if(updateSucceeded) { + inSyncReplicas = newInSyncReplicas + zkVersion = newVersion + trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","), zkVersion)) + } else { + info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) + } replicaManager.isrExpandRate.mark() } maybeIncrementLeaderHW(leaderReplica) @@ -325,7 +333,15 @@ class Partition(val topic: String, info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId, inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(","))) // update ISR in zk and in cache - updateIsr(newInSyncReplicas) + val (updateSucceeded,newVersion) = ReplicationUtils.updateIsr(zkClient, topic, partitionId, localBrokerId, + leaderEpoch, controllerEpoch, zkVersion, newInSyncReplicas) + if(updateSucceeded) { + inSyncReplicas = newInSyncReplicas + zkVersion = newVersion + trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newInSyncReplicas.mkString(","), zkVersion)) + } else { + info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) + } // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) replicaManager.isrShrinkRate.mark() @@ -373,22 +389,6 @@ class Partition(val topic: String, } } - private def updateIsr(newIsr: Set[Replica]) { - debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(","))) - val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) - // use the epoch of the controller that made the leadership decision, instead of the current controller epoch - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), - ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) - if (updateSucceeded){ - inSyncReplicas = newIsr - zkVersion = newVersion - trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) - } else { - info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion)) - } - } - override def equals(that: Any): Boolean = { if(!(that.isInstanceOf[Partition])) return false diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index e776423..84ddc9f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -981,7 +981,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch), - leaderAndIsr.zkVersion) + leaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData)) newLeaderAndIsr.zkVersion = newVersion finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) @@ -1035,7 +1035,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch), - leaderAndIsr.zkVersion) + leaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData)) newLeaderAndIsr.zkVersion = newVersion finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) if (updateSucceeded) @@ -1333,6 +1333,16 @@ case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controlle leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")") leaderAndIsrInfo.toString() } + + override def equals(obj: Any): Boolean = { + obj match { + case null => false + case n: LeaderIsrAndControllerEpoch => + leaderAndIsr.leader == n.leaderAndIsr.leader && leaderAndIsr.isr.sorted == n.leaderAndIsr.isr.sorted && + leaderAndIsr.leaderEpoch == n.leaderAndIsr.leaderEpoch && controllerEpoch == n.controllerEpoch + case _ => false + } + } } object ControllerStats extends KafkaMetricsGroup { diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 6457b56..e29e470 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -22,7 +22,7 @@ import collection.mutable.Buffer import java.util.concurrent.atomic.AtomicBoolean import kafka.api.LeaderAndIsr import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} -import kafka.utils.{Logging, ZkUtils} +import kafka.utils.{Logging, ZkUtils, ReplicationUtils} import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener} import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.apache.log4j.Logger @@ -336,7 +336,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr) val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), - ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion) + ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion,Some(ReplicationUtils.checkLeaderAndIsrZkData)) newLeaderAndIsr = leaderAndIsr newLeaderAndIsr.zkVersion = newVersion zookeeperPathUpdateSucceeded = updateSucceeded @@ -521,5 +521,3 @@ case object NewPartition extends PartitionState { val state: Byte = 0 } case object OnlinePartition extends PartitionState { val state: Byte = 1 } case object OfflinePartition extends PartitionState { val state: Byte = 2 } case object NonExistentPartition extends PartitionState { val state: Byte = 3 } - - diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala new file mode 100644 index 0000000..eb53837 --- /dev/null +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils +import kafka.cluster.Replica +import kafka.api.LeaderAndIsr +import kafka.controller.LeaderIsrAndControllerEpoch +import org.apache.zookeeper.data.Stat +import org.I0Itec.zkclient.ZkClient + +import scala.Some +import scala.collection._ + +object ReplicationUtils extends Logging { + + def updateIsr(zkClient: ZkClient, topic: String, partitionId: Int, brokerId: Int, leaderEpoch: Int, + controllerEpoch: Int, zkVersion: Int, newIsr: Set[Replica]): (Boolean,Int) = { + debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(","))) + val newLeaderAndIsr = new LeaderAndIsr(brokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) + val path = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId) + val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch) + // use the epoch of the controller that made the leadership decision, instead of the current controller epoch + ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, + Some(checkLeaderAndIsrZkData)) + } + + def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String,newLeaderData: String, zkVersion: Int): (Boolean,Int) = { + try { + val newLeaderStat: Stat = new Stat() + newLeaderStat.setVersion(zkVersion) + val newLeader = parseLeaderAndIsr(newLeaderData, path, newLeaderStat) + val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient,path) + val writtenLeaderOpt = writtenLeaderAndIsrInfo._1 + val writtenStat = writtenLeaderAndIsrInfo._2 + writtenLeaderOpt match { + case Some(writtenData) => + val writtenLeader = parseLeaderAndIsr(writtenData, path, writtenStat) + (newLeader,writtenLeader) match { + case (Some(newLeader),Some(writtenLeader)) => + if(newLeader.equals(writtenLeader)) + return (true,writtenStat.getVersion()) + case _ => + } + case None => + } + } catch { + case e1: Exception => + } + (false,-1) + } + + def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat) + : Option[LeaderIsrAndControllerEpoch] = { + Json.parseFull(leaderAndIsrStr) match { + case Some(m) => + val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]] + val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int] + val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int] + val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]] + val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] + val zkPathVersion = stat.getVersion + debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for leaderAndIsrPath %s".format(leader, epoch, + isr.toString(), zkPathVersion, path)) + Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)) + case None => None + } + } + +} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index fcbe269..1a23eb4 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -20,7 +20,8 @@ package kafka.utils import kafka.cluster.{Broker, Cluster} import kafka.consumer.TopicCount import org.I0Itec.zkclient.{IZkDataListener, ZkClient} -import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} +import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, + ZkMarshallingError, ZkBadVersionException} import org.I0Itec.zkclient.serialize.ZkSerializer import collection._ import kafka.api.LeaderAndIsr @@ -58,7 +59,7 @@ object ZkUtils extends Logging { getTopicPath(topic) + "/partitions" } - def getTopicConfigPath(topic: String): String = + def getTopicConfigPath(topic: String): String = TopicConfigPath + "/" + topic def getDeleteTopicPath(topic: String): String = @@ -91,7 +92,7 @@ object ZkUtils extends Logging { val leaderAndIsrOpt = leaderAndIsrInfo._1 val stat = leaderAndIsrInfo._2 leaderAndIsrOpt match { - case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, topic, partition, stat) + case Some(leaderAndIsrStr) => ReplicationUtils.parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat) case None => None } } @@ -99,29 +100,12 @@ object ZkUtils extends Logging { def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) } - + def setupCommonPaths(zkClient: ZkClient) { for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath)) makeSurePersistentPathExists(zkClient, path) } - def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat) - : Option[LeaderIsrAndControllerEpoch] = { - Json.parseFull(leaderAndIsrStr) match { - case Some(m) => - val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]] - val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int] - val epoch = leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int] - val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]] - val controllerEpoch = leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int] - val zkPathVersion = stat.getVersion - debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for partition [%s,%d]".format(leader, epoch, - isr.toString(), zkPathVersion, topic, partition)) - Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)) - case None => None - } - } - def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = { val leaderAndIsrOpt = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndIsrPath(topic, partition))._1 leaderAndIsrOpt match { @@ -380,16 +364,26 @@ object ZkUtils extends Logging { * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't * exist, the current version is not the expected version, etc.) return (false, -1) */ - def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { + def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int, + optionalChecker:Option[(ZkClient, String,String,Int) => (Boolean,Int)] = None): (Boolean, Int) = { try { val stat = client.writeDataReturnStat(path, data, expectVersion) debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" .format(path, data, expectVersion, stat.getVersion)) (true, stat.getVersion) } catch { - case e: Exception => + case e1: ZkBadVersionException => { + optionalChecker match { + case Some(checker) => return checker(client,path,data,expectVersion) + case _ => debug("Checker method is not passed skipping zkData match") + } error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, - expectVersion, e.getMessage)) + expectVersion, e1.getMessage)) + (false, -1) + } + case e2: Exception => + error("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, + expectVersion, e2.getMessage)) (false, -1) } } @@ -428,7 +422,7 @@ object ZkUtils extends Logging { case e2: Throwable => throw e2 } } - + def deletePath(client: ZkClient, path: String): Boolean = { try { client.delete(path) @@ -451,7 +445,7 @@ object ZkUtils extends Logging { case e2: Throwable => throw e2 } } - + def maybeDeletePath(zkUrl: String, dir: String) { try { val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index 8cde3c4..de02e47 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -792,20 +792,14 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): except: pass - # 4. consumer config - consumerProperties = {} - consumerProperties["consumer.timeout.ms"] = timeoutMs - try: + # 4. group + groupOption = "" + try: groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "group.id") - consumerProperties["group.id"] = groupOption + groupOption = "--group " + groupOption except: pass - props_file_path=write_consumer_properties(consumerProperties) - scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/" - logger.debug("executing command [" + scpCmdStr + "]", extra=d) - system_test_utils.sys_call(scpCmdStr) - if len(formatterOption) > 0: formatterOption = " --formatter " + formatterOption + " " @@ -824,8 +818,9 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): kafkaHome + "/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer", "--zookeeper " + zkConnectStr, "--topic " + topic, - "--consumer.config /tmp/consumer.properties", + "--consumer-timeout-ms " + timeoutMs, "--csv-reporter-enabled", + groupOption, formatterOption, "--from-beginning", " >> " + logPathName + "/" + logFile + " & echo pid:$! > ", @@ -930,20 +925,13 @@ def start_console_consumer(systemTestEnv, testcaseEnv): logger.error("Invalid cluster name : " + clusterName, extra=d) sys.exit(1) - consumerProperties = {} - consumerProperties["consumer.timeout.ms"] = timeoutMs - props_file_path=write_consumer_properties(consumerProperties) - scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/" - logger.debug("executing command [" + scpCmdStr + "]", extra=d) - system_test_utils.sys_call(scpCmdStr) - cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, kafkaRunClassBin + " kafka.consumer.ConsoleConsumer", "--zookeeper " + zkConnectStr, "--topic " + topic, - "--consumer.config /tmp/consumer.properties", + "--consumer-timeout-ms " + timeoutMs, "--csv-reporter-enabled", #"--metrics-dir " + metricsDir, formatterOption, @@ -2496,12 +2484,4 @@ def get_leader_attributes(systemTestEnv, testcaseEnv): return leaderDict -def write_consumer_properties(consumerProperties): - import tempfile - props_file_path = tempfile.gettempdir() + "/consumer.properties" - consumer_props_file=open(props_file_path,"w") - for key,value in consumerProperties.iteritems(): - consumer_props_file.write(key+"="+value+"\n") - consumer_props_file.close() - return props_file_path