Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1409619) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -372,7 +372,9 @@ new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*)) } - def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) { + def makeLeaderForPartition(zkClient: ZkClient, topic: String, + leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int], + controllerEpoch: Int) { leaderPerPartitionMap.foreach { leaderForPartition => { @@ -390,7 +392,7 @@ newLeaderAndIsr.zkVersion += 1 } ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), - newLeaderAndIsr.toString) + ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)) } catch { case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe) } Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (revision 1409619) +++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (working copy) @@ -69,7 +69,7 @@ val leaderForPartitionMap = Map( 0 -> configs.head.brokerId ) - TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap) + TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val topicMetadata = mockLogManagerAndTestTopic(topic) assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size) assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic) Index: core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala =================================================================== --- core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (revision 1409619) +++ core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (working copy) @@ -13,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */ +*/ package kafka.zk @@ -35,7 +35,7 @@ try { ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created") } catch { - case e: Exception => println("Exception in creating ephemeral node") + case e: Exception => } var testData: String = null Index: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (revision 1409619) +++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (working copy) @@ -23,6 +23,10 @@ import kafka.utils.TestUtils._ import junit.framework.Assert._ import kafka.utils.{ZkUtils, Utils, TestUtils} +import kafka.controller.{LeaderIsrAndControllerEpoch, ControllerChannelManager} +import kafka.cluster.Broker +import kafka.common.ErrorMapping +import kafka.api._ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { val brokerId1 = 0 @@ -35,6 +39,8 @@ val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + var staleControllerEpochDetected = false + override def setUp() { super.setUp() // start both servers @@ -95,4 +101,50 @@ else assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3) } + + def testLeaderElectionWithStaleControllerEpoch() { + // start 2 brokers + val topic = "new-topic" + val partitionId = 0 + + // create topic with 1 partition, 2 replicas, one on each broker + CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1") + + // wait until leader is elected + val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500) + val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId) + debug("leader Epoc: " + leaderEpoch1) + debug("Leader is elected to be: %s".format(leader1.getOrElse(-1))) + assertTrue("Leader should get elected", leader1.isDefined) + // NOTE: this is to avoid transient test failures + assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1)) + assertEquals("First epoch value should be 0", 0, leaderEpoch1) + + + // start another controller + val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort())) + val brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, "localhost", s.config.port)) + val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig) + controllerChannelManager.startup() + val staleControllerEpoch = 0 + val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderIsrAndControllerEpoch] + leaderAndIsr.put((topic, partitionId), + new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2)) + val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch) + + controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback) + TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000) + assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected) + + controllerChannelManager.shutdown() + } + + private def staleControllerEpochCallback(response: RequestOrResponse): Unit = { + val leaderAndIsrResponse = response.asInstanceOf[LeaderAndIsrResponse] + staleControllerEpochDetected = leaderAndIsrResponse.errorCode match { + case ErrorMapping.StaleControllerEpochCode => true + case _ => false + } + } } \ No newline at end of file Index: core/src/test/scala/unit/kafka/admin/AdminTest.scala =================================================================== --- core/src/test/scala/unit/kafka/admin/AdminTest.scala (revision 1409619) +++ core/src/test/scala/unit/kafka/admin/AdminTest.scala (working copy) @@ -159,7 +159,7 @@ // create the topic AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // create leaders for all partitions - TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap) + TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas) val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList assertEquals(expectedReplicaAssignment.size, actualReplicaList.size) @@ -189,7 +189,7 @@ TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3)) AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient) // create leaders for all partitions - TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap) + TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1) val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient) newTopicMetadata.errorCode match { Index: core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala =================================================================== --- core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala (revision 1409619) +++ core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala (working copy) @@ -25,6 +25,7 @@ import kafka.cluster.Broker import collection.mutable._ import kafka.common.{TopicAndPartition, ErrorMapping} +import kafka.controller.LeaderIsrAndControllerEpoch object SerializationTestUtils{ @@ -83,11 +84,11 @@ private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) def createTestLeaderAndIsrRequest() : LeaderAndIsrRequest = { - val leaderAndIsr1 = new LeaderAndIsr(leader1, 1, isr1, 1) - val leaderAndIsr2 = new LeaderAndIsr(leader2, 1, isr2, 2) + val leaderAndIsr1 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader1, 1, isr1, 1), 1) + val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1) val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)), ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3))) - new LeaderAndIsrRequest(map, collection.immutable.Set[Broker]()) + new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1) } def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = { @@ -97,13 +98,13 @@ } def createTestStopReplicaRequest() : StopReplicaRequest = { - new StopReplicaRequest(deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0))) + new StopReplicaRequest(controllerEpoch = 1, deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0))) } def createTestStopReplicaResponse() : StopReplicaResponse = { val responseMap = Map(((topic1, 0), ErrorMapping.NoError), ((topic2, 0), ErrorMapping.NoError)) - new StopReplicaResponse(1, responseMap) + new StopReplicaResponse(1, responseMap.toMap) } def createTestProducerRequest: ProducerRequest = { Index: core/src/test/resources/log4j.properties =================================================================== --- core/src/test/resources/log4j.properties (revision 1409619) +++ core/src/test/resources/log4j.properties (working copy) @@ -18,7 +18,10 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -log4j.logger.kafka=ERROR +#log4j.logger.kafka.controller=INFO +#log4j.logger.kafka.controller.KafkaController$ControllerEpochListener=INFO +#log4j.logger.kafka.producer=TRACE +#log4j.logger.kafka.server.ReplicaManager=TRACE # zkclient can be verbose, during debugging it is common to adjust is separately log4j.logger.org.I0Itec.zkclient.ZkClient=WARN Index: core/src/main/scala/kafka/cluster/Partition.scala =================================================================== --- core/src/main/scala/kafka/cluster/Partition.scala (revision 1409619) +++ core/src/main/scala/kafka/cluster/Partition.scala (working copy) @@ -24,6 +24,7 @@ import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup import kafka.common.ErrorMapping +import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} /** @@ -44,6 +45,12 @@ private val leaderIsrUpdateLock = new Object private var zkVersion: Int = LeaderAndIsr.initialZKVersion private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1 + /* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup. + * One way of doing that is through the controller's start replica state change command. When a new broker starts up + * the controller sends it a start replica command containing the leader for each partition that the broker hosts. + * In addition to the leader, the controller can also send the epoch of the controller that elected the leader for + * each partition. */ + private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId) private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) @@ -117,14 +124,18 @@ * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) * 4. set the new leader and ISR */ - def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr): Boolean = { + def makeLeader(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Boolean = { leaderIsrUpdateLock synchronized { + val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr if (leaderEpoch >= leaderAndIsr.leaderEpoch){ info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request" .format(leaderEpoch, leaderAndIsr.leaderEpoch)) return false } trace("Started to become leader at the request %s".format(leaderAndIsr.toString())) + // record the epoch of the controller that made the leadership decision. This is useful while updating the isr + // to maintain the decision maker controller's epoch in the zookeeper path + controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch // stop replica fetcher thread, if any replicaFetcherManager.removeFetcher(topic, partitionId) @@ -148,14 +159,19 @@ * 3. set the leader and set ISR to empty * 4. start a fetcher to the new leader */ - def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, liveBrokers: Set[Broker]): Boolean = { + def makeFollower(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, + liveBrokers: Set[Broker]): Boolean = { leaderIsrUpdateLock synchronized { + val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr if (leaderEpoch >= leaderAndIsr.leaderEpoch){ - info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follwer request" + info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follower request" .format(leaderEpoch, leaderAndIsr.leaderEpoch)) return false } trace("Started to become follower at the request %s".format(leaderAndIsr.toString())) + // record the epoch of the controller that made the leadership decision. This is useful while updating the isr + // to maintain the decision maker controller's epoch in the zookeeper path + controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch val newLeaderBrokerId: Int = leaderAndIsr.leader info("Starting the follower state transition to follow leader %d for topic %s partition %d" .format(newLeaderBrokerId, topic, partitionId)) @@ -290,8 +306,10 @@ private def updateIsr(newIsr: Set[Replica]) { info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(", "))) val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion) + // use the epoch of the controller that made the leadership decision, instead of the current controller epoch val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndIsr.toString(), zkVersion) + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), + ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) if (updateSucceeded){ inSyncReplicas = newIsr zkVersion = newVersion Index: core/src/main/scala/kafka/controller/PartitionStateMachine.scala =================================================================== --- core/src/main/scala/kafka/controller/PartitionStateMachine.scala (revision 1409619) +++ core/src/main/scala/kafka/controller/PartitionStateMachine.scala (working copy) @@ -85,7 +85,7 @@ if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition)) handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, offlinePartitionSelector) } - brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers) + brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers) } catch { case e => error("Error while moving some partitions to the online state", e) } @@ -104,8 +104,8 @@ partitions.foreach { topicAndPartition => handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector) } - brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers) - } catch { + brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers) + }catch { case e => error("Error while moving some partitions to %s state".format(targetState), e) } } @@ -144,7 +144,7 @@ case _ => // should never come here since illegal previous states are checked above } info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition, - partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leader)) + partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader)) partitionState.put(topicAndPartition, OnlinePartition) // post: partition has a leader case OfflinePartition => @@ -231,22 +231,28 @@ debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas)) // make the first replica in the list of assigned replicas, the leader val leader = liveAssignedReplicas.head - val leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList) + val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList), + controller.epoch) try { ZkUtils.createPersistentPath(controllerContext.zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), leaderAndIsr.toString) + ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), + ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch)) // NOTE: the above write can fail only if the current controller lost its zk session and the new controller // took over and initialized this partition. This can happen if the current controller went into a long // GC pause brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic, - topicAndPartition.partition, leaderAndIsr, replicaAssignment.size) - controllerContext.allLeaders.put(topicAndPartition, leaderAndIsr) + topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment.size) + controllerContext.allLeaders.put(topicAndPartition, leaderIsrAndControllerEpoch) partitionState.put(topicAndPartition, OnlinePartition) } catch { case e: ZkNodeExistsException => + // read the controller epoch + val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, + topicAndPartition.partition).get ControllerStat.offlinePartitionRate.mark() throw new StateChangeFailedException("Error while changing partition %s's state from New to Online" - .format(topicAndPartition) + " since Leader and ISR path already exists") + .format(topicAndPartition) + " since Leader and isr path already exists with value " + + "%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)) } } } @@ -266,22 +272,30 @@ var newLeaderAndIsr: LeaderAndIsr = null var replicasForThisPartition: Seq[Int] = Seq.empty[Int] while(!zookeeperPathUpdateSucceeded) { - val currentLeaderAndIsr = getLeaderAndIsrOrThrowException(topic, partition) + val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition) + val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr + val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch + if(controllerEpoch > controller.epoch) + throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" + + "means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) + + "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch)) // elect new leader or throw exception val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topic, partition, currentLeaderAndIsr) val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString, currentLeaderAndIsr.zkVersion) + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), + ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion) newLeaderAndIsr = leaderAndIsr newLeaderAndIsr.zkVersion = newVersion zookeeperPathUpdateSucceeded = updateSucceeded replicasForThisPartition = replicas } + val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch) // update the leader cache - controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderAndIsr) + controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch) info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition)) - // notify all replicas of the new leader - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderAndIsr, - controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size) + // store new leader and isr info in cache + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, + newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size) } catch { case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead." .format(topic, partition) + " Marking this partition offline", poe) @@ -299,9 +313,9 @@ zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic)) } - private def getLeaderAndIsrOrThrowException(topic: String, partition: Int): LeaderAndIsr = { - ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match { - case Some(currentLeaderAndIsr) => currentLeaderAndIsr + private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = { + ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match { + case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch case None => throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " + "[%s, %d] in %s state".format(topic, partition, partitionState(TopicAndPartition(topic, partition)))) Index: core/src/main/scala/kafka/controller/KafkaController.scala =================================================================== --- core/src/main/scala/kafka/controller/KafkaController.scala (revision 1409619) +++ core/src/main/scala/kafka/controller/KafkaController.scala (working copy) @@ -24,23 +24,27 @@ import kafka.admin.PreferredReplicaLeaderElectionCommand import kafka.api._ import kafka.cluster.Broker -import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException} +import kafka.common._ import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import kafka.server.{ZookeeperLeaderElector, KafkaConfig} import kafka.utils.ZkUtils._ import kafka.utils.{Utils, ZkUtils, Logging} import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient} -import org.I0Itec.zkclient.exception.ZkNoNodeException +import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} +import scala.Some +import kafka.common.TopicAndPartition class ControllerContext(val zkClient: ZkClient, var controllerChannelManager: ControllerChannelManager = null, val controllerLock: Object = new Object, var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty, val brokerShutdownLock: Object = new Object, + var epoch: Int = KafkaController.InitialControllerEpoch - 1, + var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1, var allTopics: Set[String] = Set.empty, var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty, - var allLeaders: mutable.Map[TopicAndPartition, LeaderAndIsr] = mutable.Map.empty, + var allLeaders: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty, var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap, var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = @@ -68,6 +72,8 @@ object KafkaController { val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps" + val InitialControllerEpoch = 1 + val InitialControllerEpochZkVersion = 1 } class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { @@ -82,6 +88,7 @@ private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest) + registerControllerChangedListener() newGauge( "ActiveControllerCount", @@ -90,6 +97,8 @@ } ) + def epoch = controllerContext.epoch + /** * JMX operation to initiate clean shutdown of a broker. On clean shutdown, * the controller first determines the partitions that the shutting down @@ -127,8 +136,8 @@ def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized { trace("All leaders = " + controllerContext.allLeaders.mkString(",")) controllerContext.allLeaders.filter { - case (topicAndPartition, leader) => - leader.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1 + case (topicAndPartition, leaderIsrAndControllerEpoch) => + leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1 }.map(_._1) } @@ -139,18 +148,18 @@ val (topic, partition) = topicAndPartition.asTuple // move leadership serially to relinquish lock. controllerContext.controllerLock synchronized { - controllerContext.allLeaders.get(topicAndPartition).foreach{ currLeaderAndIsr => - if (currLeaderAndIsr.leader == id) { + controllerContext.allLeaders.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch => + if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) { partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, controlledShutdownPartitionLeaderSelector) - val newLeaderAndIsr = controllerContext.allLeaders(topicAndPartition) + val newLeaderIsrAndControllerEpoch = controllerContext.allLeaders(topicAndPartition) // mark replica offline only if leadership was moved successfully - if (newLeaderAndIsr.leader != currLeaderAndIsr.leader) + if (newLeaderIsrAndControllerEpoch.leaderAndIsr.leader != currLeaderIsrAndControllerEpoch.leaderAndIsr.leader) replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica) } else debug("Partition %s moved from leader %d to new leader %d during shutdown." - .format(topicAndPartition, id, currLeaderAndIsr.leader)) + .format(topicAndPartition, id, currLeaderIsrAndControllerEpoch.leaderAndIsr.leader)) } } } @@ -165,18 +174,19 @@ allPartitionsAndReplicationFactorOnBroker foreach { case(topicAndPartition, replicationFactor) => val (topic, partition) = topicAndPartition.asTuple - if (controllerContext.allLeaders(topicAndPartition).leader != id) { + if (controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader != id) { brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false) removeReplicaFromIsr(topic, partition, id) match { - case Some(updatedLeaderAndIsr) => + case Some(updatedLeaderIsrAndControllerEpoch) => brokerRequestBatch.addLeaderAndIsrRequestForBrokers( - Seq(updatedLeaderAndIsr.leader), topic, partition, updatedLeaderAndIsr, replicationFactor) + Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition, + updatedLeaderIsrAndControllerEpoch, replicationFactor) case None => // ignore } } } - brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.liveBrokers) val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(","))) @@ -187,15 +197,21 @@ /** * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller. * It does the following things on the become-controller state change - - * 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and + * 1. Register controller epoch changed listener + * 2. Increments the controller epoch + * 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and * leaders for all existing partitions. - * 2. Starts the controller's channel manager - * 3. Starts the replica state machine - * 4. Starts the partition state machine + * 4. Starts the controller's channel manager + * 5. Starts the replica state machine + * 6. Starts the partition state machine + * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller. + * This ensures another controller election will be triggered and there will always be an actively serving controller */ def onControllerFailover() { if(isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) + // increment the controller epoch + incrementControllerEpoch(zkClient) // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks registerReassignedPartitionsListener() registerPreferredReplicaElectionListener() @@ -205,7 +221,7 @@ partitionStateMachine.startup() replicaStateMachine.startup() Utils.registerMBean(this, KafkaController.MBeanName) - info("Broker %d is ready to serve as the new controller".format(config.brokerId)) + info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) } else info("Controller has been shut down, aborting startup/failover") @@ -268,7 +284,7 @@ val deadBrokersSet = deadBrokers.toSet // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader => - deadBrokersSet.contains(partitionAndLeader._2.leader)).keySet + deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader)).keySet partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition) // trigger OnlinePartition state changes for offline or new partitions partitionStateMachine.triggerOnlinePartitionStateChange() @@ -389,6 +405,37 @@ controllerContext.controllerChannelManager.sendRequest(brokerId, request, callback) } + def incrementControllerEpoch(zkClient: ZkClient) = { + try { + var newControllerEpoch = controllerContext.epoch + 1 + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPathIfExists(zkClient, + ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion) + if(!updateSucceeded) + throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure") + else { + controllerContext.epochZkVersion = newVersion + controllerContext.epoch = newControllerEpoch + } + } catch { + case nne: ZkNoNodeException => + // if path doesn't exist, this is the first controller whose epoch should be 1 + // the following call can still fail if another controller gets elected between checking if the path exists and + // trying to create the controller epoch path + try { + zkClient.createPersistent(ZkUtils.ControllerEpochPath, KafkaController.InitialControllerEpoch.toString) + controllerContext.epoch = KafkaController.InitialControllerEpoch + controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion + } catch { + case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " + + "Aborting controller startup procedure") + case oe => error("Error while incrementing controller epoch", oe) + } + case oe => error("Error while incrementing controller epoch", oe) + + } + info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch)) + } + private def registerSessionExpirationListener() = { zkClient.subscribeStateChanges(new SessionExpirationListener()) } @@ -397,7 +444,7 @@ controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq) - controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, LeaderAndIsr] + controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] // update the leader and isr cache for all existing partitions from Zookeeper updateLeaderAndIsrCache() // start the channel manager @@ -429,7 +476,7 @@ val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient) // check if they are already completed val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition => - controllerContext.allLeaders(partition).leader == controllerContext.partitionReplicaAssignment(partition).head) + controllerContext.allLeaders(partition).leaderAndIsr.leader == controllerContext.partitionReplicaAssignment(partition).head) controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(","))) @@ -445,13 +492,13 @@ private def updateLeaderAndIsrCache() { val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.allTopics.toSeq) - for((topicPartition, leaderAndIsr) <- leaderAndIsrInfo) { + for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) { // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it - controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match { + controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match { case true => - controllerContext.allLeaders.put(topicPartition, leaderAndIsr) + controllerContext.allLeaders.put(topicPartition, leaderIsrAndControllerEpoch) case false => - debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderAndIsr.leader) + + debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderIsrAndControllerEpoch.leaderAndIsr.leader) + "partition %s is dead, just ignore it".format(topicPartition)) } } @@ -469,7 +516,7 @@ private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas - val currentLeader = controllerContext.allLeaders(topicAndPartition).leader + val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) { info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(","))) @@ -542,6 +589,10 @@ zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this)) } + private def registerControllerChangedListener() { + zkClient.subscribeDataChanges(ZkUtils.ControllerEpochPath, new ControllerEpochListener(this)) + } + def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) { // read the current list of reassigned partitions from zookeeper val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) @@ -570,7 +621,7 @@ def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) { for(partition <- partitionsToBeRemoved) { // check the status - val currentLeader = controllerContext.allLeaders(partition).leader + val currentLeader = controllerContext.allLeaders(partition).leaderAndIsr.leader val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head if(currentLeader == preferredReplica) { info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica)) @@ -598,35 +649,42 @@ * @return the new leaderAndIsr (with the replica removed if it was present), * or None if leaderAndIsr is empty. */ - def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderAndIsr] = { + def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderIsrAndControllerEpoch] = { val topicAndPartition = TopicAndPartition(topic, partition) debug("Removing replica %d from ISR of %s.".format(replicaId, topicAndPartition)) - var finalLeaderAndIsr: Option[LeaderAndIsr] = None + var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None var zkWriteCompleteOrUnnecessary = false while (!zkWriteCompleteOrUnnecessary) { // refresh leader and isr from zookeeper again - val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) - zkWriteCompleteOrUnnecessary = leaderAndIsrOpt match { - case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes + val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match { + case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes + val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr + val controllerEpoch = leaderIsrAndEpoch.controllerEpoch + if(controllerEpoch > epoch) + throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" + + "means the current controller with epoch %d went through a soft failure and another ".format(epoch) + + "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch)) if (leaderAndIsr.isr.contains(replicaId)) { val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1) // update the new leadership decision in zookeeper or retry - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(), leaderAndIsr.zkVersion) + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( + zkClient, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), + ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch), + leaderAndIsr.zkVersion) newLeaderAndIsr.zkVersion = newVersion - finalLeaderAndIsr = Some(newLeaderAndIsr) - if (updateSucceeded) { - // we've successfully written to ZK, let's refresh our cache - info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString())) - controllerContext.allLeaders.put(topicAndPartition, newLeaderAndIsr) - } + finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) + if (updateSucceeded) + info("New leader and ISR for partition [%s, %d] is %s" + .format(topic, partition, newLeaderAndIsr.toString())) updateSucceeded } else { warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s" - .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr)) - finalLeaderAndIsr = Some(leaderAndIsr) + .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr)) + finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)) true } case None => @@ -634,7 +692,7 @@ true } } - finalLeaderAndIsr + finalLeaderIsrAndControllerEpoch } class SessionExpirationListener() extends IZkStateListener with Logging { @@ -859,11 +917,50 @@ } } +class ControllerEpochListener(controller: KafkaController) extends IZkDataListener with Logging { + this.logIdent = "[ControllerEpochListener on " + controller.config.brokerId + "]: " + val controllerContext = controller.controllerContext + readControllerEpochFromZookeeper() + + /** + * Invoked when a controller updates the epoch value + * @throws Exception On any error. + */ + @throws(classOf[Exception]) + def handleDataChange(dataPath: String, data: Object) { + debug("Controller epoch listener fired with new epoch " + data.toString) + controllerContext.controllerLock synchronized { + // read the epoch path to get the zk version + readControllerEpochFromZookeeper() + } + } + + /** + * @throws Exception + * On any error. + */ + @throws(classOf[Exception]) + def handleDataDeleted(dataPath: String) { + } + + private def readControllerEpochFromZookeeper() { + // initialize the controller epoch and zk version by reading from zookeeper + if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) { + val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath) + controllerContext.epoch = epochData._1.toInt + controllerContext.epochZkVersion = epochData._2.getVersion + info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion)) + } + } +} + case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty, var isrChangeListener: ReassignedPartitionsIsrChangeListener = null) case class PartitionAndReplica(topic: String, partition: Int, replica: Int) +case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) + object ControllerStat extends KafkaMetricsGroup { val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS) val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS) Index: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala =================================================================== --- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (revision 1409619) +++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (working copy) @@ -83,7 +83,7 @@ try { brokerRequestBatch.newBatch() replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState)) - brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers) + brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers) }catch { case e => error("Error while moving some replicas to %s state".format(targetState), e) } @@ -106,14 +106,14 @@ case NewReplica => assertValidPreviousStates(topic, partition, replicaId, List(NonExistentReplica), targetState) // start replica as a follower to the current leader for its partition - val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) - leaderAndIsrOpt match { - case Some(leaderAndIsr) => - if(leaderAndIsr.leader == replicaId) + val leaderIsrAndControllerEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + leaderIsrAndControllerEpochOpt match { + case Some(leaderIsrAndControllerEpoch) => + if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica" .format(replicaId, topic, partition) + "state as it is being requested to become leader") brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), - topic, partition, leaderAndIsr, replicaAssignment.size) + topic, partition, leaderIsrAndControllerEpoch, replicaAssignment.size) case None => // new leader request will be sent to this replica when one gets elected } replicaState.put((topic, partition, replicaId), NewReplica) @@ -137,13 +137,14 @@ controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) case _ => - // if the leader for this replica exists and is alive, send the leader and ISR - controllerContext.allLeaders.get(topicAndPartition) match { - case Some(leaderAndIsr) => - controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match { + // check if the leader for this partition is alive or even exists + controllerContext.allLeaders.get(topicAndPartition) match { + case Some(leaderIsrAndControllerEpoch) => + controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match { case true => // leader is alive brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), - topic, partition, leaderAndIsr, replicaAssignment.size) + topic, partition, leaderIsrAndControllerEpoch, + replicaAssignment.size) replicaState.put((topic, partition, replicaId), OnlineReplica) info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition)) case false => // ignore partitions whose leader is not alive @@ -155,14 +156,15 @@ case OfflineReplica => assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState) // As an optimization, the controller removes dead replicas from the ISR - val leaderAndIsrIsEmpty = controllerContext.allLeaders.get(topicAndPartition) match { - case Some(currLeaderAndIsr) => - if (currLeaderAndIsr.isr.contains(replicaId)) + val leaderAndIsrIsEmpty: Boolean = controllerContext.allLeaders.get(topicAndPartition) match { + case Some(currLeaderIsrAndControllerEpoch) => + if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId)) controller.removeReplicaFromIsr(topic, partition, replicaId) match { - case Some(updatedLeaderAndIsr) => + case Some(updatedLeaderIsrAndControllerEpoch) => // send the shrunk ISR state change request only to the leader - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderAndIsr.leader), - topic, partition, updatedLeaderAndIsr, replicaAssignment.size) + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), + topic, partition, updatedLeaderIsrAndControllerEpoch, + replicaAssignment.size) replicaState.put((topic, partition, replicaId), OfflineReplica) info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition)) info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition)) Index: core/src/main/scala/kafka/controller/ControllerChannelManager.scala =================================================================== --- core/src/main/scala/kafka/controller/ControllerChannelManager.scala (revision 1409619) +++ core/src/main/scala/kafka/controller/ControllerChannelManager.scala (working copy) @@ -159,12 +159,13 @@ stopAndDeleteReplicaRequestMap.clear() } - def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr, replicationFactor: Int) { + def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, + leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicationFactor: Int) { brokerIds.foreach { brokerId => leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo]) leaderAndIsrRequestMap(brokerId).put((topic, partition), - PartitionStateInfo(leaderAndIsr, replicationFactor)) + PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor)) } } @@ -183,13 +184,13 @@ } } - def sendRequestsToBrokers(liveBrokers: Set[Broker]) { + def sendRequestsToBrokers(controllerEpoch: Int, liveBrokers: Set[Broker]) { leaderAndIsrRequestMap.foreach { m => val broker = m._1 - val partitionStateInfos = m._2 - val leaderIds = partitionStateInfos.map(_._2.leaderAndIsr.leader).toSet + val partitionStateInfos = m._2.toMap + val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet val leaders = liveBrokers.filter(b => leaderIds.contains(b.id)) - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders) + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch) debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest)) sendRequest(broker, leaderAndIsrRequest, null) } @@ -201,7 +202,8 @@ if (replicas.size > 0) { debug("The stop replica request (delete = %s) sent to broker %d is %s" .format(deletePartitions, broker, replicas.mkString(","))) - sendRequest(broker, new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ replicas), null) + sendRequest(broker, new StopReplicaRequest(deletePartitions, + Set.empty[(String, Int)] ++ replicas, controllerEpoch), null) } } m.clear() Index: core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala =================================================================== --- core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala (revision 1409619) +++ core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala (working copy) @@ -128,7 +128,7 @@ val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val preferredReplica = assignedReplicas.head // check if preferred replica is the current leader - val currentLeader = controllerContext.allLeaders(topicAndPartition).leader + val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader if(currentLeader == preferredReplica) { throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition [%s,%d]" .format(preferredReplica, topic, partition)) Index: core/src/main/scala/kafka/common/ControllerMovedException.scala =================================================================== --- core/src/main/scala/kafka/common/ControllerMovedException.scala (revision 0) +++ core/src/main/scala/kafka/common/ControllerMovedException.scala (revision 0) @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.common + +class ControllerMovedException(message: String, cause: Throwable) extends RuntimeException(message, cause) { + def this(message: String) = this(message, null) + def this() = this(null, null) +} \ No newline at end of file Index: core/src/main/scala/kafka/common/ErrorMapping.scala =================================================================== --- core/src/main/scala/kafka/common/ErrorMapping.scala (revision 1409619) +++ core/src/main/scala/kafka/common/ErrorMapping.scala (working copy) @@ -40,6 +40,7 @@ val BrokerNotAvailableCode: Short = 8 val ReplicaNotAvailableCode: Short = 9 val MessageSizeTooLargeCode: Short = 10 + val StaleControllerEpochCode: Short = 11 private val exceptionToCode = Map[Class[Throwable], Short]( @@ -52,7 +53,8 @@ classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode, classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode, classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode, - classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode + classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode, + classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode ).withDefaultValue(UnknownCode) /* invert the mapping */ Index: core/src/main/scala/kafka/utils/ZkUtils.scala =================================================================== --- core/src/main/scala/kafka/utils/ZkUtils.scala (revision 1409619) +++ core/src/main/scala/kafka/utils/ZkUtils.scala (working copy) @@ -24,17 +24,19 @@ import org.I0Itec.zkclient.serialize.ZkSerializer import scala.collection._ import kafka.api.LeaderAndIsr +import mutable.HashMap import org.apache.zookeeper.data.Stat import java.util.concurrent.locks.{ReentrantLock, Condition} -import kafka.controller.{PartitionAndReplica, ReassignedPartitionsContext} import kafka.admin._ import kafka.common.{TopicAndPartition, KafkaException, NoEpochForPartitionException} +import kafka.controller.{LeaderIsrAndControllerEpoch, PartitionAndReplica, ReassignedPartitionsContext} object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" val ControllerPath = "/controller" + val ControllerEpochPath = "/controllerEpoch" val ReassignPartitionsPath = "/admin/reassign_partitions" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" @@ -74,7 +76,7 @@ brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) } - def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { + def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = { val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition) val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath) val leaderAndIsrOpt = leaderAndIsrInfo._1 @@ -85,17 +87,23 @@ } } - def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat): Option[LeaderAndIsr] = { + def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = { + getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr) + } + + def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat) + : Option[LeaderIsrAndControllerEpoch] = { Json.parseFull(leaderAndIsrStr) match { case Some(m) => val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get + val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt val isr = Utils.parseCsvList(isrString).map(r => r.toInt) val zkPathVersion = stat.getVersion debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch, isr.toString(), zkPathVersion, topic, partition)) - Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion)) + Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion), controllerEpoch)) case None => None } } @@ -189,6 +197,15 @@ topicDirs.consumerOwnerDir + "/" + partition } + def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = { + val jsonDataMap = new HashMap[String, String] + jsonDataMap.put("leader", leaderAndIsr.leader.toString) + jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString) + jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(",")) + jsonDataMap.put("controllerEpoch", controllerEpoch.toString) + Utils.stringMapToJson(jsonDataMap) + } + /** * make sure a persistent path exists in ZK. Create the path if not exist. */ @@ -314,6 +331,25 @@ } /** + * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the current + * version is not the expected version, etc.) return (false, -1). If path doesn't exist, throws ZkNoNodeException + */ + def conditionalUpdatePersistentPathIfExists(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = { + try { + val stat = client.writeData(path, data, expectVersion) + info("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d" + .format(path, data, expectVersion, stat.getVersion)) + (true, stat.getVersion) + } catch { + case nne: ZkNoNodeException => throw nne + case e: Exception => + error("Conditional update of zkPath %s with data %s and expected version %d failed".format(path, data, + expectVersion), e) + (false, -1) + } + } + + /** * Update the value of a persistent node with the given path and data. * create parrent directory if necessary. Never throw NodeExistException. */ @@ -439,13 +475,13 @@ } def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Seq[String]): - mutable.Map[TopicAndPartition, LeaderAndIsr] = { - val ret = new mutable.HashMap[TopicAndPartition, LeaderAndIsr] + mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = { + val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] val partitionsForTopics = getPartitionsForTopics(zkClient, topics) for((topic, partitions) <- partitionsForTopics) { for(partition <- partitions) { - ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition.toInt) match { - case Some(leaderAndIsr) => ret.put(TopicAndPartition(topic, partition.toInt), leaderAndIsr) + ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition.toInt) match { + case Some(leaderIsrAndControllerEpoch) => ret.put(TopicAndPartition(topic, partition.toInt), leaderIsrAndControllerEpoch) case None => } } Index: core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala =================================================================== --- core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala (revision 1409619) +++ core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala (working copy) @@ -44,8 +44,6 @@ } } - def amILeader : Boolean = leaderId == brokerId - def elect: Boolean = { controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) try { @@ -56,10 +54,14 @@ } catch { case e: ZkNodeExistsException => // If someone else has written the path, then - debug("Someone else was elected as leader other than " + brokerId) val data: String = controllerContext.zkClient.readData(electionPath, true) - if (data != null) leaderId = data.toInt - case e2 => throw e2 + debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId)) + if (data != null) { + leaderId = data.toInt + } + case e2 => + error("Error while electing or becoming leader on broker %d".format(brokerId), e2) + resign() } amILeader } @@ -68,6 +70,12 @@ leaderId = -1 } + def amILeader : Boolean = leaderId == brokerId + + def resign() = { + deletePath(controllerContext.zkClient, electionPath) + } + /** * We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will * have its own session expiration listener and handler @@ -79,6 +87,10 @@ */ @throws(classOf[Exception]) def handleDataChange(dataPath: String, data: Object) { + controllerContext.controllerLock synchronized { + leaderId = data.toString.toInt + info("New leader is %d".format(leaderId)) + } } /** Index: core/src/main/scala/kafka/server/ReplicaManager.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaManager.scala (revision 1409619) +++ core/src/main/scala/kafka/server/ReplicaManager.scala (working copy) @@ -18,6 +18,7 @@ import kafka.cluster.{Broker, Partition, Replica} import collection._ +import mutable.HashMap import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean import kafka.utils._ @@ -26,7 +27,8 @@ import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit import kafka.common._ -import kafka.api.{PartitionStateInfo, LeaderAndIsrRequest} +import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} +import kafka.controller.KafkaController object ReplicaManager { @@ -38,6 +40,8 @@ val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, val logManager: LogManager) extends Logging with KafkaMetricsGroup { + /* epoch of the controller that last changed the leader */ + @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 private val allPartitions = new Pool[(String, Int), Partition] private var leaderPartitions = new mutable.HashSet[Partition]() private val leaderPartitionsLock = new Object @@ -110,6 +114,23 @@ errorCode } + def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = { + val responseMap = new collection.mutable.HashMap[(String, Int), Short] + if(stopReplicaRequest.controllerEpoch < controllerEpoch) { + error("Received stop replica request from an old controller epoch %d.".format(stopReplicaRequest.controllerEpoch) + + " Latest known controller epoch is %d " + controllerEpoch) + (responseMap, ErrorMapping.StaleControllerEpochCode) + } else { + controllerEpoch = stopReplicaRequest.controllerEpoch + val responseMap = new HashMap[(String, Int), Short] + for((topic, partitionId) <- stopReplicaRequest.partitions){ + val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions) + responseMap.put((topic, partitionId), errorCode) + } + (responseMap, ErrorMapping.NoError) + } + } + def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = { var partition = allPartitions.get((topic, partitionId)) if (partition == null) { @@ -158,49 +179,42 @@ } } - def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): collection.Map[(String, Int), Short] = { + def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = { info("Handling leader and isr request %s".format(leaderAndISRRequest)) val responseMap = new collection.mutable.HashMap[(String, Int), Short] + if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { + error("Received leader and isr request from an old controller epoch %d.".format(leaderAndISRRequest.controllerEpoch) + + " Latest known controller epoch is %d " + controllerEpoch) + (responseMap, ErrorMapping.StaleControllerEpochCode) + }else { + controllerEpoch = leaderAndISRRequest.controllerEpoch + for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos) { + var errorCode = ErrorMapping.NoError + val topic = topicAndPartition._1 + val partitionId = topicAndPartition._2 - for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos){ - var errorCode = ErrorMapping.NoError - val topic = topicAndPartition._1 - val partitionId = topicAndPartition._2 - - val requestedLeaderId = partitionStateInfo.leaderAndIsr.leader - try { - if(requestedLeaderId == config.brokerId) - makeLeader(topic, partitionId, partitionStateInfo) - else - makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders) - } catch { - case e => - error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e) - errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader + try { + if(requestedLeaderId == config.brokerId) + makeLeader(topic, partitionId, partitionStateInfo) + else + makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders) + } catch { + case e => + error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e) + errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) + } + responseMap.put(topicAndPartition, errorCode) } - responseMap.put(topicAndPartition, errorCode) + (responseMap, ErrorMapping.NoError) } - - /** - * If IsInit flag is on, this means that the controller wants to treat topics not in the request - * as deleted. - * TODO: Handle this properly as part of KAFKA-330 - */ -// if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){ -// startHighWaterMarksCheckPointThread -// val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.partitionStateInfos.contains(p._1)).map(entry => entry._1) -// info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove)) -// partitionsToRemove.foreach(p => stopReplica(p._1, p._2)) -// } - - responseMap } private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = { - val leaderAndIsr = partitionStateInfo.leaderAndIsr + val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeLeader(topic, partitionId, leaderAndIsr)) { + if (partition.makeLeader(topic, partitionId, leaderIsrAndControllerEpoch)) { // also add this partition to the list of partitions for which the leader is the current broker leaderPartitionsLock synchronized { leaderPartitions += partition @@ -209,14 +223,15 @@ info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId)) } - private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker]) { - val leaderAndIsr = partitionStateInfo.leaderAndIsr - val leaderBrokerId: Int = leaderAndIsr.leader + private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo, + liveBrokers: Set[Broker]) { + val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch + val leaderBrokerId: Int = leaderIsrAndControllerEpoch.leaderAndIsr.leader info("Starting the follower state transition to follow leader %d for topic %s partition %d" .format(leaderBrokerId, topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeFollower(topic, partitionId, leaderAndIsr, liveBrokers)) { + if (partition.makeFollower(topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers)) { // remove this replica's partition from the ISR expiration queue leaderPartitionsLock synchronized { leaderPartitions -= partition @@ -233,7 +248,7 @@ def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = { val partitionOpt = getPartition(topic, partitionId) - if(partitionOpt.isDefined){ + if(partitionOpt.isDefined) { partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset) } else { warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId)) Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1409619) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -24,7 +24,6 @@ import kafka.utils.{Pool, SystemTime, Logging} import org.apache.log4j.Logger import scala.collection._ -import mutable.HashMap import kafka.network.RequestChannel.Response import java.util.concurrent.TimeUnit import java.util.concurrent.atomic._ @@ -127,8 +126,8 @@ requestLogger.trace("Handling leader and ISR request " + leaderAndIsrRequest) trace("Handling leader and ISR request " + leaderAndIsrRequest) try { - val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest) - val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, responseMap) + val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest) + val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, response, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) } catch { case e: KafkaStorageException => @@ -144,13 +143,8 @@ requestLogger.trace("Handling stop replica request " + stopReplicaRequest) trace("Handling stop replica request " + stopReplicaRequest) - val responseMap = new HashMap[(String, Int), Short] - for((topic, partitionId) <- stopReplicaRequest.partitions) { - val errorCode = replicaManager.stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions) - responseMap.put((topic, partitionId), errorCode) - } - - val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap) + val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) + val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, response.toMap, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) } Index: core/src/main/scala/kafka/api/StopReplicaRequest.scala =================================================================== --- core/src/main/scala/kafka/api/StopReplicaRequest.scala (revision 1409619) +++ core/src/main/scala/kafka/api/StopReplicaRequest.scala (working copy) @@ -33,6 +33,7 @@ val versionId = buffer.getShort val clientId = readShortString(buffer) val ackTimeoutMs = buffer.getInt + val controllerEpoch = buffer.getInt val deletePartitions = buffer.get match { case 1 => true case 0 => false @@ -44,7 +45,7 @@ (1 to topicPartitionPairCount) foreach { _ => topicPartitionPairSet.add(readShortString(buffer), buffer.getInt) } - StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet) + StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch) } } @@ -52,18 +53,20 @@ clientId: String, ackTimeoutMs: Int, deletePartitions: Boolean, - partitions: Set[(String, Int)]) + partitions: Set[(String, Int)], + controllerEpoch: Int) extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) { - def this(deletePartitions: Boolean, partitions: Set[(String, Int)]) = { + def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int) = { this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, - deletePartitions, partitions) + deletePartitions, partitions, controllerEpoch) } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) + buffer.putInt(controllerEpoch) buffer.put(if (deletePartitions) 1.toByte else 0.toByte) buffer.putInt(partitions.size) for ((topic, partitionId) <- partitions){ @@ -77,6 +80,7 @@ 2 + /* versionId */ ApiUtils.shortStringLength(clientId) + 4 + /* ackTimeoutMs */ + 4 + /* controller epoch */ 1 + /* deletePartitions */ 4 /* partition count */ for ((topic, partitionId) <- partitions){ Index: core/src/main/scala/kafka/api/StopReplicaResponse.scala =================================================================== --- core/src/main/scala/kafka/api/StopReplicaResponse.scala (revision 1409619) +++ core/src/main/scala/kafka/api/StopReplicaResponse.scala (working copy) @@ -19,13 +19,15 @@ import java.nio.ByteBuffer import collection.mutable.HashMap -import collection.Map +import collection.immutable.Map +import kafka.common.ErrorMapping import kafka.api.ApiUtils._ object StopReplicaResponse { def readFrom(buffer: ByteBuffer): StopReplicaResponse = { val versionId = buffer.getShort + val errorCode = buffer.getShort val numEntries = buffer.getInt val responseMap = new HashMap[(String, Int), Short]() @@ -35,23 +37,31 @@ val partitionErrorCode = buffer.getShort() responseMap.put((topic, partition), partitionErrorCode) } - new StopReplicaResponse(versionId, responseMap) + new StopReplicaResponse(versionId, responseMap.toMap, errorCode) } } case class StopReplicaResponse(val versionId: Short, - val responseMap: Map[(String, Int), Short]) extends RequestOrResponse{ + val responseMap: Map[(String, Int), Short], + val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ def sizeInBytes(): Int ={ - var size = 2 + 4 - for ((key, value) <- responseMap){ - size += (2 + key._1.length) + 4 + 2 + var size = + 2 /* version id */ + + 2 /* error code */ + + 4 /* number of responses */ + for ((key, value) <- responseMap) { + size += + 2 + key._1.length /* topic */ + + 4 /* partition */ + + 2 /* error code for this partition */ } size } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putShort(errorCode) buffer.putInt(responseMap.size) for ((key:(String, Int), value) <- responseMap){ writeShortString(buffer, key._1) Index: core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala =================================================================== --- core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (revision 1409619) +++ core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (working copy) @@ -21,9 +21,8 @@ import java.nio._ import kafka.utils._ import kafka.api.ApiUtils._ -import collection.mutable.Map -import collection.mutable.HashMap import kafka.cluster.Broker +import kafka.controller.LeaderIsrAndControllerEpoch object LeaderAndIsr { @@ -35,7 +34,7 @@ def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion) override def toString(): String = { - val jsonDataMap = new HashMap[String, String] + val jsonDataMap = new collection.mutable.HashMap[String, String] jsonDataMap.put("leader", leader.toString) jsonDataMap.put("leaderEpoch", leaderEpoch.toString) jsonDataMap.put("ISR", isr.mkString(",")) @@ -43,35 +42,42 @@ } } - object PartitionStateInfo { def readFrom(buffer: ByteBuffer): PartitionStateInfo = { + val controllerEpoch = buffer.getInt val leader = buffer.getInt - val leaderGenId = buffer.getInt + val leaderEpoch = buffer.getInt val isrString = readShortString(buffer) val isr = isrString.split(",").map(_.toInt).toList val zkVersion = buffer.getInt val replicationFactor = buffer.getInt - PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, isr, zkVersion), replicationFactor) + PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, zkVersion), controllerEpoch), + replicationFactor) } } -case class PartitionStateInfo(val leaderAndIsr: LeaderAndIsr, val replicationFactor: Int) { +case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, val replicationFactor: Int) { def writeTo(buffer: ByteBuffer) { - buffer.putInt(leaderAndIsr.leader) - buffer.putInt(leaderAndIsr.leaderEpoch) - writeShortString(buffer, leaderAndIsr.isr.mkString(",")) - buffer.putInt(leaderAndIsr.zkVersion) + buffer.putInt(leaderIsrAndControllerEpoch.controllerEpoch) + buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leader) + buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) + writeShortString(buffer, leaderIsrAndControllerEpoch.leaderAndIsr.isr.mkString(",")) + buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion) buffer.putInt(replicationFactor) } def sizeInBytes(): Int = { - val size = 4 + 4 + (2 + leaderAndIsr.isr.mkString(",").length) + 4 + 4 + val size = + 4 /* epoch of the controller that elected the leader */ + + 4 /* leader broker id */ + + 4 /* leader epoch */ + + (2 + leaderIsrAndControllerEpoch.leaderAndIsr.isr.mkString(",").length) + + 4 /* zk version */ + + 4 /* replication factor */ size } } - object LeaderAndIsrRequest { val CurrentVersion = 1.shortValue() val DefaultClientId = "" @@ -83,8 +89,9 @@ val versionId = buffer.getShort val clientId = readShortString(buffer) val ackTimeoutMs = buffer.getInt + val controllerEpoch = buffer.getInt val partitionStateInfosCount = buffer.getInt - val partitionStateInfos = new HashMap[(String, Int), PartitionStateInfo] + val partitionStateInfos = new collection.mutable.HashMap[(String, Int), PartitionStateInfo] for(i <- 0 until partitionStateInfosCount){ val topic = readShortString(buffer) @@ -99,26 +106,28 @@ for (i <- 0 until leadersCount) leaders += Broker.readFrom(buffer) - new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos, leaders) + new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch) } } - case class LeaderAndIsrRequest (versionId: Short, clientId: String, ackTimeoutMs: Int, partitionStateInfos: Map[(String, Int), PartitionStateInfo], - leaders: Set[Broker]) + leaders: Set[Broker], + controllerEpoch: Int) extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) { - def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker]) = { - this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, partitionStateInfos, liveBrokers) + def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerEpoch: Int) = { + this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, + partitionStateInfos, liveBrokers, controllerEpoch) } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) writeShortString(buffer, clientId) buffer.putInt(ackTimeoutMs) + buffer.putInt(controllerEpoch) buffer.putInt(partitionStateInfos.size) for((key, value) <- partitionStateInfos){ writeShortString(buffer, key._1) @@ -130,12 +139,17 @@ } def sizeInBytes(): Int = { - var size = 1 + 2 + (2 + clientId.length) + 4 + 4 + var size = + 2 /* version id */ + + (2 + clientId.length) /* client id */ + + 4 /* ack timeout */ + + 4 /* controller epoch */ + + 4 /* number of partitions */ for((key, value) <- partitionStateInfos) - size += (2 + key._1.length) + 4 + value.sizeInBytes - size += 4 + size += (2 + key._1.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */ + size += 4 /* number of leader brokers */ for(broker <- leaders) - size += broker.sizeInBytes + size += broker.sizeInBytes /* broker info */ size } } \ No newline at end of file Index: core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala =================================================================== --- core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala (revision 1409619) +++ core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala (working copy) @@ -17,6 +17,7 @@ package kafka.api +import kafka.common.ErrorMapping import java.nio.ByteBuffer import kafka.api.ApiUtils._ import collection.mutable.HashMap @@ -26,6 +27,7 @@ object LeaderAndIsrResponse { def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = { val versionId = buffer.getShort + val errorCode = buffer.getShort val numEntries = buffer.getInt val responseMap = new HashMap[(String, Int), Short]() for (i<- 0 until numEntries){ @@ -34,24 +36,32 @@ val partitionErrorCode = buffer.getShort responseMap.put((topic, partition), partitionErrorCode) } - new LeaderAndIsrResponse(versionId, responseMap) + new LeaderAndIsrResponse(versionId, responseMap, errorCode) } } case class LeaderAndIsrResponse(versionId: Short, - responseMap: Map[(String, Int), Short]) + responseMap: Map[(String, Int), Short], + errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse { def sizeInBytes(): Int ={ - var size = 2 + 4 - for ((key, value) <- responseMap){ - size += 2 + key._1.length + 4 + 2 + var size = + 2 /* version id */ + + 2 /* error code */ + + 4 /* number of responses */ + for ((key, value) <- responseMap) { + size += + 2 + key._1.length /* topic */ + + 4 /* partition */ + + 2 /* error code for this partition */ } size } def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) + buffer.putShort(errorCode) buffer.putInt(responseMap.size) for ((key:(String, Int), value) <- responseMap){ writeShortString(buffer, key._1)