diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 2f706c9..70d1b81 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -104,10 +104,14 @@ object ReassignPartitionsCommand extends Logging { } } } else if(options.has(topicsToMoveJsonFileOpt)) { + if(!options.has(brokerListOpt)) { + System.err.println("broker-list is required if topics-to-move-json-file is used") + parser.printHelpOn(System.err) + System.exit(1) + } val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt) - val brokerList = options.valueOf(brokerListOpt) + val brokerListToReassign = options.valueOf(brokerListOpt).split(',').map(_.toInt) val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) - val brokerListToReassign = brokerList.split(',') map (_.toInt) val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) @@ -117,7 +121,6 @@ object ReassignPartitionsCommand extends Logging { topicInfo._2.head._2.size) partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) } - } else if (options.has(manualAssignmentJsonFileOpt)) { val manualAssignmentJsonFile = options.valueOf(manualAssignmentJsonFileOpt) val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile) @@ -175,8 +178,11 @@ object ReassignPartitionsCommand extends Logging { val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) if(assignedReplicas == newReplicas) ReassignmentCompleted - else + else { + println(("ERROR: Assigned replicas (%s) don't match the list of replicas for reassignment (%s)" + + " for partition %s").format(assignedReplicas.mkString(","), newReplicas.mkString(","), topicAndPartition)) ReassignmentFailed + } } } } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 5ccecd1..2e9f693 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -132,6 +132,9 @@ class Partition(val topic: String, assignedReplicaMap.values.toSet } + def removeReplica(replicaId: Int) { + assignedReplicaMap.remove(replicaId) + } /** * If the leaderEpoch of the incoming request is higher than locally cached epoch, make the local replica the leader in the following steps. @@ -140,7 +143,7 @@ class Partition(val topic: String, * 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) * 4. set the new leader and ISR */ - def makeLeader(controllerId: Int, topic: String, partitionId: Int, + def makeLeader(controllerId: Int, topic: String, partitionId: Int, allReplicas: Set[Int], leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -157,7 +160,11 @@ class Partition(val topic: String, // stop replica fetcher thread, if any replicaFetcherManager.removeFetcher(topic, partitionId) + allReplicas.foreach(replica => getOrCreateReplica(replica)) val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet + // remove assigned replicas that have been removed by the controller + val assignedReplicasToBeDeleted = assignedReplicas().map(_.brokerId) -- allReplicas + assignedReplicasToBeDeleted.foreach(removeReplica(_)) // reset LogEndOffset for remote replicas assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) inSyncReplicas = newInSyncReplicas @@ -177,7 +184,8 @@ class Partition(val topic: String, * 3. set the leader and set ISR to empty * 4. start a fetcher to the new leader */ - def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, + def makeFollower(controllerId: Int, topic: String, partitionId: Int, allReplicas: Set[Int], + leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, leaders: Set[Broker], correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr @@ -203,6 +211,11 @@ class Partition(val topic: String, // stop fetcher thread to previous leader replicaFetcherManager.removeFetcher(topic, partitionId) localReplica.log.get.truncateTo(localReplica.highWatermark) + // add replicas that are new + allReplicas.foreach(r => getOrCreateReplica(r)) + // remove assigned replicas that have been removed by the controller + val assignedReplicasToBeDeleted = assignedReplicas().map(_.brokerId) -- allReplicas + assignedReplicasToBeDeleted.foreach(removeReplica(_)) logManager.checkpointRecoveryPointOffsets() inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndIsr.leaderEpoch @@ -230,7 +243,13 @@ class Partition(val topic: String, def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { leaderIsrUpdateLock synchronized { debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) - val replica = getOrCreateReplica(replicaId) + val replicaOpt = getReplica(replicaId) + if(!replicaOpt.isDefined) { + throw new IllegalStateException(("Leader %d failed to record follower %d's position %d for partition [%s,%d] since the replica %d" + + " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, + offset, topic, partitionId, replicaId, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) + } + val replica = replicaOpt.get replica.logEndOffset = offset // check if this replica needs to be added to the ISR @@ -238,7 +257,11 @@ class Partition(val topic: String, case Some(leaderReplica) => val replica = getReplica(replicaId).get val leaderHW = leaderReplica.highWatermark - if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) { + // For a replica to get added back to ISR, it has to satisfy 3 conditions- + // 1. It is not already in the ISR + // 2. It is part of the assigned replica list. See KAFKA-1097 + // 3. It's log end offset >= leader's highwatermark + if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset >= leaderHW) { // expand ISR val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for partition [%s,%d] from %s to %s" @@ -367,7 +390,7 @@ class Partition(val topic: String, val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) - if (updateSucceeded){ + if (updateSucceeded) { inSyncReplicas = newIsr zkVersion = newVersion trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 88d130f..608c2fd 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -35,6 +35,7 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger import scala.Some import kafka.common.TopicAndPartition +import org.apache.log4j.Logger class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int, @@ -105,6 +106,7 @@ object KafkaController extends Logging { class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true + private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) private val partitionStateMachine = new PartitionStateMachine(this) private val replicaStateMachine = new ReplicaStateMachine(this) @@ -359,17 +361,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * Reassigning replicas for a partition goes through a few stages - * RAR = Reassigned replicas * AR = Original list of replicas for partition - * 1. Start new replicas RAR - AR. - * 2. Wait until new replicas are in sync with the leader - * 3. If the leader is not in RAR, elect a new leader from RAR - * 4. Stop old replicas AR - RAR - * 5. Write new AR - * 6. Remove partition from the /admin/reassign_partitions path + * 1. Write new AR = AR + RAR + * 2. Start new replicas RAR - AR. + * 3. Wait until new replicas are in sync with the leader + * 4. If the leader is not in RAR, elect a new leader from RAR + * 5. Stop old replicas AR - RAR + * 6. Write new AR = RAR + * 7. Remove partition from the /admin/reassign_partitions path */ def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match { case true => + val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet // mark the new replicas as online reassignedReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, @@ -378,9 +382,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // check if current leader is in the new replicas list. If not, controller needs to trigger leader election moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext) // stop older replicas - stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext) + stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas) // write the new list of replicas for this partition in zookeeper - updateAssignedReplicasForPartition(topicAndPartition, reassignedPartitionContext) + updateAssignedReplicasForPartition(topicAndPartition, reassignedPartitionContext.newReplicas) // update the /admin/reassign_partitions path to remove this partition removePartitionFromReassignedPartitions(topicAndPartition) info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition)) @@ -390,8 +394,15 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg case false => info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + "reassigned not yet caught up with the leader") + val newReplicas = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet + val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet + // write the expanded list of replicas to zookeeper + updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq) + // update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest + updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), + newAndOldReplicas.toSeq) // start new replicas - startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext) + startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicas) info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + "reassigned to catch up with the leader") } @@ -602,6 +613,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader + // change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr + // request to the current or new leader. This will prevent it from adding the old replicas to the ISR + val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) + controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas) if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) { info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(","))) @@ -613,6 +628,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg case true => info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(","))) + // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest + updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas) case false => info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(","))) @@ -622,12 +639,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } private def stopOldReplicasOfReassignedPartition(topicAndPartition: TopicAndPartition, - reassignedPartitionContext: ReassignedPartitionsContext) { - val reassignedReplicas = reassignedPartitionContext.newReplicas + reassignedPartitionContext: ReassignedPartitionsContext, + oldReplicas: Set[Int]) { val topic = topicAndPartition.topic val partition = topicAndPartition.partition - // send stop replica state change request to the old replicas - val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet // first move the replica to offline state (the controller removes it from the ISR) oldReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topic, partition, replica)), OfflineReplica) @@ -639,31 +654,44 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition, - reassignedPartitionContext: ReassignedPartitionsContext) { - val reassignedReplicas = reassignedPartitionContext.newReplicas + replicas: Seq[Int]) { val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic)) - partitionsAndReplicasForThisTopic.put(topicAndPartition, reassignedReplicas) + partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas) updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic) - info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, reassignedReplicas.mkString(","))) + info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(","))) // update the assigned replica list after a successful zookeeper write - controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas) - // stop watching the ISR changes for this partition - zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), - controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener) + controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas) } private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition, - reassignedPartitionContext: ReassignedPartitionsContext) { + reassignedPartitionContext: ReassignedPartitionsContext, + newReplicas: Set[Int]) { // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned // replicas list - val assignedReplicaSet = Set.empty[Int] ++ controllerContext.partitionReplicaAssignment(topicAndPartition) - val reassignedReplicaSet = Set.empty[Int] ++ reassignedPartitionContext.newReplicas - val newReplicas: Seq[Int] = (reassignedReplicaSet -- assignedReplicaSet).toSeq newReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica) } } + private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) { + brokerRequestBatch.newBatch() + updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match { + case Some(updatedLeaderIsrAndControllerEpoch) => + // send the shrunk assigned replica list to all the replicas, including the leader, so that it no longer + // allows old replicas to enter ISR + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic, + topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas) + brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement) + stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " + + "to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch, + newAssignedReplicas.mkString(","), updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader, topicAndPartition)) + case None => // fail the reassignment + stateChangeLogger.error(("Controller %d epoch %d failed to send LeaderAndIsr request with new assigned replica list %s " + + "to leader for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, + newAssignedReplicas.mkString(","), topicAndPartition)) + } + } + private def registerReassignedPartitionsListener() = { zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this)) } @@ -677,6 +705,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) { + // stop watching the ISR changes for this partition + zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), + controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener) // read the current list of reassigned partitions from zookeeper val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) // remove this partition from that list @@ -793,6 +824,51 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg finalLeaderIsrAndControllerEpoch } + /** + * Does not change leader or isr, but just increments the leader epoch + * @param topic topic + * @param partition partition + * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty. + */ + private def updateLeaderEpoch(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = { + val topicAndPartition = TopicAndPartition(topic, partition) + debug("Updating leader epoch for partition %s.".format(topicAndPartition)) + var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None + var zkWriteCompleteOrUnnecessary = false + while (!zkWriteCompleteOrUnnecessary) { + // refresh leader and isr from zookeeper again + val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) + zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match { + case Some(leaderIsrAndEpoch) => + val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr + val controllerEpoch = leaderIsrAndEpoch.controllerEpoch + if(controllerEpoch > epoch) + throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" + + "means the current controller with epoch %d went through a soft failure and another ".format(epoch) + + "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch)) + // increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded + // assigned replica list + val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, + leaderAndIsr.isr, leaderAndIsr.zkVersion + 1) + // update the new leadership decision in zookeeper or retry + val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( + zkClient, + ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), + ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch), + leaderAndIsr.zkVersion) + newLeaderAndIsr.zkVersion = newVersion + finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) + if (updateSucceeded) + info("Updated leader epoch for partition %s to %d".format(topicAndPartition, newLeaderAndIsr.leaderEpoch)) + updateSucceeded + case None => + warn("Cannot update leader epoch for partition %s as leaderAndIsr path is empty".format(topicAndPartition)) + true + } + } + finalLeaderIsrAndControllerEpoch + } + class SessionExpirationListener() extends IZkStateListener with Logging { this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], " @throws(classOf[Exception]) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 212c05d..c52225a 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -165,6 +165,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { replicaState.put((topic, partition, replicaId), OnlineReplica) case OfflineReplica => assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState) + // send stop replica command to the replica so that it stops fetching from the leader + brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) // As an optimization, the controller removes dead replicas from the ISR val leaderAndIsrIsEmpty: Boolean = controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index ee1cc0c..c525740 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -256,7 +256,7 @@ class ReplicaManager(val config: KafkaConfig, "starting the become-leader transition for partition [%s,%d]") .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeLeader(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, correlationId)) { + if (partition.makeLeader(controllerId, topic, partitionId, partitionStateInfo.allReplicas, leaderIsrAndControllerEpoch, correlationId)) { // also add this partition to the list of partitions for which the leader is the current broker leaderPartitionsLock synchronized { leaderPartitions += partition @@ -273,7 +273,8 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, leaders, correlationId)) { + if (partition.makeFollower(controllerId, topic, partitionId, partitionStateInfo.allReplicas, + leaderIsrAndControllerEpoch, leaders, correlationId)) { // remove this replica's partition from the ISR expiration queue leaderPartitionsLock synchronized { leaderPartitions -= partition diff --git a/kafka-patch-review.py b/kafka-patch-review.py index daf2c35..7fa6cb5 100644 --- a/kafka-patch-review.py +++ b/kafka-patch-review.py @@ -95,12 +95,12 @@ def main(): comment="Created reviewboard " if not opt.reviewboard: - print 'Created a new reviewboard ',rb_url,' against branch ',opt.branch + print 'Created a new reviewboard ',rb_url, else: - print 'Updated reviewboard',opt.reviewboard + print 'Updated reviewboard' comment="Updated reviewboard " - comment = comment + rb_url + comment = comment + rb_url + ' against branch ' + opt.branch jira.add_comment(opt.jira, comment) if __name__ == '__main__': diff --git a/project/Build.scala b/project/Build.scala index bcd1ca5..40e0c4f 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -44,7 +44,7 @@ object KafkaBuild extends Build { crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"), excludeFilter in unmanagedSources <<= scalaVersion(v => if (v.startsWith("2.8")) "*_2.9+.scala" else "*_2.8.scala"), scalaVersion := "2.8.0", - version := "0.8.0", + version := "0.8.1", publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"), credentials += Credentials(Path.userHome / ".m2" / ".credentials"), buildNumber := System.getProperty("build.number", ""),