diff --git a/config/server.properties b/config/server.properties index d36279e..8efa83f 100644 --- a/config/server.properties +++ b/config/server.properties @@ -97,7 +97,7 @@ log.segment.bytes=536870912 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies -log.retention.check.interval.ms=60000 +log.cleanup.interval.mins=1 ############################# Zookeeper ############################# diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 70d1b81..2f706c9 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -104,14 +104,10 @@ object ReassignPartitionsCommand extends Logging { } } } else if(options.has(topicsToMoveJsonFileOpt)) { - if(!options.has(brokerListOpt)) { - System.err.println("broker-list is required if topics-to-move-json-file is used") - parser.printHelpOn(System.err) - System.exit(1) - } val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt) - val brokerListToReassign = options.valueOf(brokerListOpt).split(',').map(_.toInt) + val brokerList = options.valueOf(brokerListOpt) val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) + val brokerListToReassign = brokerList.split(',') map (_.toInt) val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) @@ -121,6 +117,7 @@ object ReassignPartitionsCommand extends Logging { topicInfo._2.head._2.size) partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) } + } else if (options.has(manualAssignmentJsonFileOpt)) { val manualAssignmentJsonFile = options.valueOf(manualAssignmentJsonFileOpt) val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile) @@ -178,11 +175,8 @@ object ReassignPartitionsCommand extends Logging { val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) if(assignedReplicas == newReplicas) ReassignmentCompleted - else { - println(("ERROR: Assigned replicas (%s) don't match the list of replicas for reassignment (%s)" + - " for partition %s").format(assignedReplicas.mkString(","), newReplicas.mkString(","), topicAndPartition)) + else ReassignmentFailed - } } } } diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 02ccc17..d8078bd 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -20,7 +20,7 @@ import scala.collection._ import kafka.admin.AdminUtils import kafka.utils._ import java.lang.Object -import kafka.api.{PartitionStateInfo, LeaderAndIsr} +import kafka.api.LeaderAndIsr import kafka.log.LogConfig import kafka.server.ReplicaManager import com.yammer.metrics.core.Gauge @@ -28,7 +28,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet -import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping} +import kafka.common.{TopicAndPartition, NotLeaderForPartitionException, ErrorMapping} /** @@ -131,34 +131,25 @@ class Partition(val topic: String, assignedReplicaMap.values.toSet } - def removeReplica(replicaId: Int) { - assignedReplicaMap.remove(replicaId) - } - def getLeaderEpoch(): Int = { leaderIsrUpdateLock synchronized { return this.leaderEpoch } } + /** * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time) * and setting the new leader and ISR */ - def makeLeader(controllerId: Int, - partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = { + def makeLeader(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { - val allReplicas = partitionStateInfo.allReplicas - val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr // record the epoch of the controller that made the leadership decision. This is useful while updating the isr // to maintain the decision maker controller's epoch in the zookeeper path controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch - // add replicas that are new - allReplicas.foreach(replica => getOrCreateReplica(replica)) + val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet - // remove assigned replicas that have been removed by the controller - (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) // reset LogEndOffset for remote replicas assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset) inSyncReplicas = newInSyncReplicas @@ -174,24 +165,16 @@ class Partition(val topic: String, /** * Make the local replica the follower by setting the new leader and ISR to empty */ - def makeFollower(controllerId: Int, - partitionStateInfo: PartitionStateInfo, - leaders: Set[Broker], correlationId: Int): Boolean = { + def makeFollower(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, leaders: Set[Broker], correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { - val allReplicas = partitionStateInfo.allReplicas - val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr val newLeaderBrokerId: Int = leaderAndIsr.leader - // record the epoch of the controller that made the leadership decision. This is useful while updating the isr - // to maintain the decision maker controller's epoch in the zookeeper path - controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 leaders.find(_.id == newLeaderBrokerId) match { case Some(leaderBroker) => - // add replicas that are new - allReplicas.foreach(r => getOrCreateReplica(r)) - // remove assigned replicas that have been removed by the controller - (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) + // record the epoch of the controller that made the leadership decision. This is useful while updating the isr + // to maintain the decision maker controller's epoch in the zookeeper path + controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch inSyncReplicas = Set.empty[Replica] leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion @@ -209,13 +192,7 @@ class Partition(val topic: String, def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) { leaderIsrUpdateLock synchronized { debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId)) - val replicaOpt = getReplica(replicaId) - if(!replicaOpt.isDefined) { - throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d for partition [%s,%d] since the replica %d" + - " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId, - offset, topic, partitionId, replicaId, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId)) - } - val replica = replicaOpt.get + val replica = getOrCreateReplica(replicaId) replica.logEndOffset = offset // check if this replica needs to be added to the ISR @@ -223,11 +200,7 @@ class Partition(val topic: String, case Some(leaderReplica) => val replica = getReplica(replicaId).get val leaderHW = leaderReplica.highWatermark - // For a replica to get added back to ISR, it has to satisfy 3 conditions- - // 1. It is not already in the ISR - // 2. It is part of the assigned replica list. See KAFKA-1097 - // 3. It's log end offset >= leader's highwatermark - if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset >= leaderHW) { + if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) { // expand ISR val newInSyncReplicas = inSyncReplicas + replica info("Expanding ISR for partition [%s,%d] from %s to %s" diff --git a/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala b/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala deleted file mode 100644 index 409d112..0000000 --- a/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -class NotAssignedReplicaException(message: String, cause: Throwable) extends RuntimeException(message, cause) { - def this(message: String) = this(message, null) - def this() = this(null, null) -} diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 88792c2..88d130f 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -35,7 +35,6 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} import java.util.concurrent.atomic.AtomicInteger import scala.Some import kafka.common.TopicAndPartition -import org.apache.log4j.Logger class ControllerContext(val zkClient: ZkClient, val zkSessionTimeout: Int, @@ -106,7 +105,6 @@ object KafkaController extends Logging { class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true - private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger) val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) private val partitionStateMachine = new PartitionStateMachine(this) private val replicaStateMachine = new ReplicaStateMachine(this) @@ -361,19 +359,17 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * Reassigning replicas for a partition goes through a few stages - * RAR = Reassigned replicas * AR = Original list of replicas for partition - * 1. Write new AR = AR + RAR - * 2. Start new replicas RAR - AR. - * 3. Wait until new replicas are in sync with the leader - * 4. If the leader is not in RAR, elect a new leader from RAR - * 5. Stop old replicas AR - RAR - * 6. Write new AR = RAR - * 7. Remove partition from the /admin/reassign_partitions path + * 1. Start new replicas RAR - AR. + * 2. Wait until new replicas are in sync with the leader + * 3. If the leader is not in RAR, elect a new leader from RAR + * 4. Stop old replicas AR - RAR + * 5. Write new AR + * 6. Remove partition from the /admin/reassign_partitions path */ def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match { case true => - val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet // mark the new replicas as online reassignedReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, @@ -382,9 +378,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // check if current leader is in the new replicas list. If not, controller needs to trigger leader election moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext) // stop older replicas - stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas) + stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext) // write the new list of replicas for this partition in zookeeper - updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas) + updateAssignedReplicasForPartition(topicAndPartition, reassignedPartitionContext) // update the /admin/reassign_partitions path to remove this partition removePartitionFromReassignedPartitions(topicAndPartition) info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition)) @@ -394,15 +390,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg case false => info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + "reassigned not yet caught up with the leader") - val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet - val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet - // write the expanded list of replicas to zookeeper - updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq) - // update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest - updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), - newAndOldReplicas.toSeq) // start new replicas - startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList) + startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext) info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + "reassigned to catch up with the leader") } @@ -613,10 +602,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader - // change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr - // request to the current or new leader. This will prevent it from adding the old replicas to the ISR - val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) - controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas) if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) { info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(","))) @@ -628,8 +613,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg case true => info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(","))) - // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest - updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas) case false => info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) + "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(","))) @@ -639,10 +622,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } private def stopOldReplicasOfReassignedPartition(topicAndPartition: TopicAndPartition, - reassignedPartitionContext: ReassignedPartitionsContext, - oldReplicas: Set[Int]) { + reassignedPartitionContext: ReassignedPartitionsContext) { + val reassignedReplicas = reassignedPartitionContext.newReplicas val topic = topicAndPartition.topic val partition = topicAndPartition.partition + // send stop replica state change request to the old replicas + val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet // first move the replica to offline state (the controller removes it from the ISR) oldReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topic, partition, replica)), OfflineReplica) @@ -654,44 +639,31 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition, - replicas: Seq[Int]) { + reassignedPartitionContext: ReassignedPartitionsContext) { + val reassignedReplicas = reassignedPartitionContext.newReplicas val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic)) - partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas) + partitionsAndReplicasForThisTopic.put(topicAndPartition, reassignedReplicas) updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic) - info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(","))) + info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, reassignedReplicas.mkString(","))) // update the assigned replica list after a successful zookeeper write - controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas) + controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas) + // stop watching the ISR changes for this partition + zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), + controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener) } private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition, - reassignedPartitionContext: ReassignedPartitionsContext, - newReplicas: Set[Int]) { + reassignedPartitionContext: ReassignedPartitionsContext) { // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned // replicas list + val assignedReplicaSet = Set.empty[Int] ++ controllerContext.partitionReplicaAssignment(topicAndPartition) + val reassignedReplicaSet = Set.empty[Int] ++ reassignedPartitionContext.newReplicas + val newReplicas: Seq[Int] = (reassignedReplicaSet -- assignedReplicaSet).toSeq newReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica) } } - private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) { - brokerRequestBatch.newBatch() - updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match { - case Some(updatedLeaderIsrAndControllerEpoch) => - // send the shrunk assigned replica list to all the replicas, including the leader, so that it no longer - // allows old replicas to enter ISR - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic, - topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas) - brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement) - stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " + - "to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch, - newAssignedReplicas.mkString(","), updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader, topicAndPartition)) - case None => // fail the reassignment - stateChangeLogger.error(("Controller %d epoch %d failed to send LeaderAndIsr request with new assigned replica list %s " + - "to leader for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, - newAssignedReplicas.mkString(","), topicAndPartition)) - } - } - private def registerReassignedPartitionsListener() = { zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this)) } @@ -705,9 +677,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) { - // stop watching the ISR changes for this partition - zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), - controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener) // read the current list of reassigned partitions from zookeeper val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient) // remove this partition from that list @@ -824,52 +793,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg finalLeaderIsrAndControllerEpoch } - /** - * Does not change leader or isr, but just increments the leader epoch - * @param topic topic - * @param partition partition - * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty. - */ - private def updateLeaderEpoch(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = { - val topicAndPartition = TopicAndPartition(topic, partition) - debug("Updating leader epoch for partition %s.".format(topicAndPartition)) - var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None - var zkWriteCompleteOrUnnecessary = false - while (!zkWriteCompleteOrUnnecessary) { - // refresh leader and isr from zookeeper again - val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) - zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match { - case Some(leaderIsrAndEpoch) => - val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr - val controllerEpoch = leaderIsrAndEpoch.controllerEpoch - if(controllerEpoch > epoch) - throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" + - "means the current controller with epoch %d went through a soft failure and another ".format(epoch) + - "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch)) - // increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded - // assigned replica list - val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, - leaderAndIsr.isr, leaderAndIsr.zkVersion + 1) - // update the new leadership decision in zookeeper or retry - val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath( - zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), - ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch), - leaderAndIsr.zkVersion) - newLeaderAndIsr.zkVersion = newVersion - finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) - if (updateSucceeded) - info("Updated leader epoch for partition %s to %d".format(topicAndPartition, newLeaderAndIsr.leaderEpoch)) - updateSucceeded - case None => - throw new IllegalStateException(("Cannot update leader epoch for partition %s as leaderAndIsr path is empty. " + - "This could mean we somehow tried to reassign a partition that doesn't exist").format(topicAndPartition)) - true - } - } - finalLeaderIsrAndControllerEpoch - } - class SessionExpirationListener() extends IZkStateListener with Logging { this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], " @throws(classOf[Exception]) diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index c52225a..212c05d 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -165,8 +165,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { replicaState.put((topic, partition, replicaId), OnlineReplica) case OfflineReplica => assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState) - // send stop replica command to the replica so that it stops fetching from the leader - brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) // As an optimization, the controller removes dead replicas from the ISR val leaderAndIsrIsEmpty: Boolean = controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 2f4e303..80dd430 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -211,7 +211,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." - .format(offset, entries, lastOffset, file.getAbsolutePath)) + .format(offset, entries, lastOffset, file.getName)) } } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 161f581..7b8f89e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -28,7 +28,7 @@ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit import kafka.common._ -import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} +import kafka.api.{StopReplicaRequest, LeaderAndIsrRequest} import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} import org.apache.log4j.Logger @@ -143,7 +143,7 @@ class ReplicaManager(val config: KafkaConfig, controllerEpoch = stopReplicaRequest.controllerEpoch val responseMap = new HashMap[(String, Int), Short] // First stop fetchers for all partitions, then stop the corresponding replicas - replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map { + replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map{ case (topic, partition) => TopicAndPartition(topic, partition) }) for((topic, partitionId) <- stopReplicaRequest.partitions){ @@ -222,27 +222,27 @@ class ReplicaManager(val config: KafkaConfig, controllerEpoch = leaderAndISRRequest.controllerEpoch // First check partition's leader epoch - val partitionState = new HashMap[Partition, PartitionStateInfo]() + val partitionleaderIsrAndControllerEpoch = new HashMap[Partition, LeaderIsrAndControllerEpoch]() leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) => val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) val partitionLeaderEpoch = partition.getLeaderEpoch() if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) { // If the leader epoch is valid record the epoch of the controller that made the leadership decision. // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path - partitionState.put(partition, partitionStateInfo) + partitionleaderIsrAndControllerEpoch.put(partition, partitionStateInfo.leaderIsrAndControllerEpoch) } else { // Otherwise record the error code in response stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " + "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d") .format(localBrokerId, correlationId, controllerId, partitionStateInfo.leaderIsrAndControllerEpoch.controllerEpoch, - partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch)) + partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition, partitionLeaderEpoch)) responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode) } } - val partitionsTobeLeader = partitionState - .filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId} - val partitionsTobeFollower = (partitionState -- partitionsTobeLeader.keys) + val partitionsTobeLeader = partitionleaderIsrAndControllerEpoch + .filter{ case (partition, leaderIsrAndControllerEpoch) => leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId} + val partitionsTobeFollower = (partitionleaderIsrAndControllerEpoch -- partitionsTobeLeader.keys) if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap) if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap) @@ -271,30 +271,28 @@ class ReplicaManager(val config: KafkaConfig, * the error message will be set on each partition since we do not know which partition caused it * TODO: the above may need to be fixed later */ - private def makeLeaders(controllerId: Int, epoch: Int, - partitionState: Map[Partition, PartitionStateInfo], + private def makeLeaders(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs: Map[Partition, LeaderIsrAndControllerEpoch], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) = { stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-leader transition for partitions %s") .format(localBrokerId, correlationId, controllerId, epoch, - partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) + partitionLeaderISRAndControllerEpochs.keySet.mkString(","))) - for (partition <- partitionState.keys) + for (partition <- partitionLeaderISRAndControllerEpochs.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) try { // First stop fetchers for all the partitions - replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) + replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new TopicAndPartition(_))) stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" - .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) + .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId)) // Update the partition information to be the leader - partitionState.foreach{ case (partition, partitionStateInfo) => - partition.makeLeader(controllerId, partitionStateInfo, correlationId)} + partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch) => partition.makeLeader(controllerId, leaderIsrAndControllerEpoch, correlationId)} // Finally add these partitions to the list of partitions for which the leader is the current broker leaderPartitionsLock synchronized { - leaderPartitions ++= partitionState.keySet + leaderPartitions ++= partitionLeaderISRAndControllerEpochs.keySet } } catch { case e: Throwable => @@ -307,8 +305,7 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + "for the become-leader transition for partitions %s") - .format(localBrokerId, correlationId, controllerId, epoch, - partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) + .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(","))) } /* @@ -328,33 +325,30 @@ class ReplicaManager(val config: KafkaConfig, * the error message will be set on each partition since we do not know which partition caused it * TODO: the above may need to be fixed later */ - private def makeFollowers(controllerId: Int, epoch: Int, - partitionState: Map[Partition, PartitionStateInfo], + private def makeFollowers(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs: Map[Partition, LeaderIsrAndControllerEpoch], leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) { stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partitions %s") .format(localBrokerId, correlationId, controllerId, epoch, - partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) + partitionLeaderISRAndControllerEpochs.keySet.mkString(","))) - for (partition <- partitionState.keys) + for (partition <- partitionLeaderISRAndControllerEpochs.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) try { - replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))) + replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new TopicAndPartition(_))) stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d" - .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) + .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId)) - logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) => + logManager.truncateTo(partitionLeaderISRAndControllerEpochs.map{ case(partition, leaderISRAndControllerEpoch) => new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark }) stateChangeLogger.trace("Broker %d truncated logs and checkpoint recovery boundaries for partitions %s as per becoming-follower request from controller %d epoch %d" - .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId)) + .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId)) if (!isShuttingDown.get()) { - replicaFetcherManager.addFetcherForPartitions(partitionState.map{ case(partition, partitionStateInfo) => - new TopicAndPartition(partition) -> - BrokerAndInitialOffset(leaders.find(_.id == partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader).get, - partition.getReplica().get.logEndOffset)} + replicaFetcherManager.addFetcherForPartitions(partitionLeaderISRAndControllerEpochs.map{ case(partition, leaderISRAndControllerEpoch) => + new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == leaderISRAndControllerEpoch.leaderAndIsr.leader).get, partition.getReplica().get.logEndOffset)} ) } else { @@ -363,11 +357,10 @@ class ReplicaManager(val config: KafkaConfig, .format(localBrokerId, correlationId, controllerId, epoch)) } - partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) => - partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} + partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch) => partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)} leaderPartitionsLock synchronized { - leaderPartitions --= partitionState.keySet + leaderPartitions --= partitionLeaderISRAndControllerEpochs.keySet } } catch { case e: Throwable => @@ -380,7 +373,7 @@ class ReplicaManager(val config: KafkaConfig, stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + "for the become-follower transition for partitions %s") - .format(localBrokerId, correlationId, controllerId, epoch, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","))) + .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(","))) } private def maybeShrinkIsr(): Unit = { diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala new file mode 100644 index 0000000..f00c081 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -0,0 +1,369 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import joptsimple.OptionParser +import kafka.cluster.Broker +import kafka.message.{MessageSet, MessageAndOffset, ByteBufferMessageSet} +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.{AtomicReference, AtomicInteger} +import kafka.consumer.SimpleConsumer +import kafka.client.ClientUtils +import java.util.regex.{PatternSyntaxException, Pattern} +import kafka.utils.{SystemTime, Logging, ShutdownableThread, Pool} +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.api._ + +/** + * For verifying the consistency among replicas. + * + * 1. start a fetcher on every broker. + * 2. each fetcher does the following + * 2.1 issues fetch request + * 2.2 puts the fetched result in a shared buffer + * 2.3 waits for all other fetchers to finish step 2.2 + * 2.4 one of the fetchers verifies the consistency of fetched results among replicas + * + * The consistency verification is up to the high watermark. The tool reports the + * max lag between the verified offset and the high watermark among all partitions. + * + * If a broker goes down, the verification of the partitions on that broker is delayed + * until the broker is up again. + * + * Caveats: + * 1. The tools needs all brokers to be up at startup time. + * 2. The tool doesn't handle out of range offsets. + */ + +object ReplicaVerificationTool extends Logging { + val clientId= "replicaVerificationTool" + + def main(args: Array[String]): Unit = { + val parser = new OptionParser + val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.") + .withRequiredArg + .describedAs("hostname:port,...,hostname:port") + .ofType(classOf[String]) + val fetchSizeOpt = parser.accepts("fetchsize", "The fetch size of each request.") + .withRequiredArg + .describedAs("fetchsize") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024 * 1024) + val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1000) + val topicWhiteListOpt = parser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.") + .withRequiredArg() + .describedAs("Java regex (String)") + .ofType(classOf[String]) + .defaultsTo(".*") + val initialOffsetTimeOpt = parser.accepts("time", "Timestamp for getting the initial offsets.") + .withRequiredArg + .describedAs("timestamp/-1(latest)/-2(earliest)") + .ofType(classOf[java.lang.Long]) + .defaultsTo(-1L) + val reportIntervalOpt = parser.accepts("report-interval-ms", "The reporting interval.") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(30 * 1000L) + + + val options = parser.parse(args : _*) + for(arg <- List(brokerListOpt)) { + if(!options.has(arg)) { + error("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val regex = options.valueOf(topicWhiteListOpt) + .trim + .replace(',', '|') + .replace(" ", "") + .replaceAll("""^["']+""","") + .replaceAll("""["']+$""","") // property files may bring quotes + + try { + Pattern.compile(regex) + } + catch { + case e: PatternSyntaxException => + throw new RuntimeException(regex + " is an invalid regex.") + } + + val fetchSize = options.valueOf(fetchSizeOpt).intValue + val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() + val initialOffsetTime = options.valueOf(initialOffsetTimeOpt).longValue() + val reportInterval = options.valueOf(reportIntervalOpt).longValue() + // getting topic metadata + info("Getting topic metatdata...") + val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) + val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) + val brokerMap = topicsMetadataResponse.extractBrokers(topicsMetadataResponse.topicsMetadata) + val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( + topicMetadata => if (topicMetadata.topic.matches(regex)) true else false + ) + val topicPartitionReplicaList: Seq[(String, Int, Int)] = filteredTopicMetadata.flatMap( + topicMetadataResponse => + topicMetadataResponse.partitionsMetadata.flatMap( + partitionMetadata => + partitionMetadata.replicas.map(broker => (topicMetadataResponse.topic, partitionMetadata.partitionId, broker.id)) + ) + ) + debug("Selected topic partitions: " + topicPartitionReplicaList) + val topicAndPartitionsPerBroker: Map[Int, Seq[TopicAndPartition]] = topicPartitionReplicaList.groupBy(_._3) + .map{case (brokerId, partitions) => + brokerId -> partitions.map{ case partition => new TopicAndPartition(partition._1, partition._2)}} + debug("Topic partitions per broker: " + topicAndPartitionsPerBroker) + val expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int] = + topicPartitionReplicaList.groupBy(replica => new TopicAndPartition(replica._1, replica._2)) + .map{case (topicAndPartition, replicaSet) => topicAndPartition -> replicaSet.size} + debug("Expected replicas per topic partition: " + expectedReplicasPerTopicAndPartition) + val leadersPerBroker: Map[Int, Seq[TopicAndPartition]] = filteredTopicMetadata.flatMap( + topicMetadataResponse => + topicMetadataResponse.partitionsMetadata.map( + partitionMetadata => + (new TopicAndPartition(topicMetadataResponse.topic, partitionMetadata.partitionId), partitionMetadata.leader.get.id)) + ).groupBy(_._2) + .mapValues(topicAndPartitionAndLeaderIds => topicAndPartitionAndLeaderIds.map{ + case(topicAndPartition, leaderId) => topicAndPartition}) + debug("Leaders per broker: " + leadersPerBroker) + + val replicaBuffer = new ReplicaBuffer(expectedReplicasPerTopicAndPartition, + leadersPerBroker, + topicAndPartitionsPerBroker.size, + brokerMap, + initialOffsetTime, + reportInterval) + var doVerification = true + // create all replica fetcher threads + val fetcherThreads: Iterable[ReplicaFetcher] = topicAndPartitionsPerBroker.map{ + case (brokerId, topicAndPartitions) => + val replicaFetcher = new ReplicaFetcher(name = "ReplicaFetcher-" + brokerId, + sourceBroker = brokerMap(brokerId), + topicAndPartitions = topicAndPartitions, + replicaBuffer = replicaBuffer, + socketTimeout = 30000, + socketBufferSize = 256000, + fetchSize = fetchSize, + maxWait = maxWaitMs, + minBytes = 1, + doVerification = doVerification) + doVerification = false + replicaFetcher + } + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + info("Stopping all fetchers") + fetcherThreads.foreach(_.shutdown()) + } + }) + fetcherThreads.foreach(_.start()) + println("Verification process is started.") + + } +} + +private case class ReplicaAndMessageIterator(replicaId: Int, iterator: Iterator[MessageAndOffset]) + +private case class MessageInfo(replicaId: Int, offset: Long, nextOffset: Long, checksum: Long) + +private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPartition, Int], + leadersPerBroker: Map[Int, Seq[TopicAndPartition]], + expectedNumFetchers: Int, + brokerMap: Map[Int, Broker], + initialOffsetTime: Long, + reportInterval: Long) extends Logging { + val fetchOffsetMap = new Pool[TopicAndPartition, Long] + val messageSetCache = new Pool[TopicAndPartition, Pool[Int, FetchResponsePartitionData]] + private val respondedNumBrokers = new AtomicInteger(0) + private val fetcherBarrier = new AtomicReference(new CountDownLatch(expectedNumFetchers)) + private val verificationBarrier = new AtomicReference(new CountDownLatch(1)) + @volatile private var lastReportTime = SystemTime.milliseconds + private var maxLag: Long = -1L + private var maxLagTopicAndPartition: TopicAndPartition = null + initialize() + + def createNewFetcherBarrier() { + fetcherBarrier.set(new CountDownLatch(expectedNumFetchers)) + } + + def getFetcherBarrier() = fetcherBarrier.get() + + def createNewVerificationBarrier() { + verificationBarrier.set(new CountDownLatch(1)) + } + + def getVerificationBarrier() = verificationBarrier.get() + + private def initialize() { + for (topicAndPartition <- expectedReplicasPerTopicAndPartition.keySet) + messageSetCache.put(topicAndPartition, new Pool[Int, FetchResponsePartitionData]) + setInitialOffsets() + } + + private def setInitialOffsets() { + for ((brokerId, topicAndPartitions) <- leadersPerBroker) { + val broker = brokerMap(brokerId) + val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100000, ReplicaVerificationTool.clientId) + val offsetMap: Map[TopicAndPartition, PartitionOffsetRequestInfo] = + topicAndPartitions.map(topicAndPartition => topicAndPartition -> PartitionOffsetRequestInfo(initialOffsetTime, 1)).toMap + val offsetRequest = OffsetRequest(offsetMap) + val offsetResponse = consumer.getOffsetsBefore(offsetRequest) + assert(!offsetResponse.hasError) + offsetResponse.partitionErrorAndOffsets.foreach{ + case (topicAndPartition, partitionOffsetResponse) => + fetchOffsetMap.put(topicAndPartition, partitionOffsetResponse.offsets.head) + } + } + } + + def verifyCheckSum() { + debug("Begin verification") + maxLag = -1L + for ((topicAndPartition, fetchResponsePerReplica) <- messageSetCache) { + debug("Verifying " + topicAndPartition) + assert(fetchResponsePerReplica.size == expectedReplicasPerTopicAndPartition(topicAndPartition)) + val messageIteratorMap = fetchResponsePerReplica.map{ + case(replicaId, fetchResponse) => + assert(fetchResponse.error == ErrorMapping.NoError) + replicaId -> fetchResponse.messages.asInstanceOf[ByteBufferMessageSet].shallowIterator} + val maxHw = fetchResponsePerReplica.values.map(_.hw).max + + var isDone = false + while (!isDone) { + var messageInfoFromFirstReplicaOpt: Option[MessageInfo] = None + for ( (replicaId, messageIterator) <- messageIteratorMap) { + if (messageIterator.hasNext) { + val messageAndOffset = messageIterator.next() + + // only verify up to the high watermark + if (messageAndOffset.offset >= fetchResponsePerReplica.get(replicaId).hw) + isDone = true + else { + messageInfoFromFirstReplicaOpt match { + case None => + messageInfoFromFirstReplicaOpt = Some( + MessageInfo(replicaId, messageAndOffset.offset,messageAndOffset.nextOffset, messageAndOffset.message.checksum)) + case Some(messageSetFromFirstReplica) => + if (messageSetFromFirstReplica.offset != messageAndOffset.offset) { + println("Partition " + topicAndPartition + ": replica " + messageSetFromFirstReplica.replicaId + + "'s offset " + messageSetFromFirstReplica.offset + " doesn't match replica " + + replicaId + "'s offset " + messageAndOffset.offset) + System.exit(1) + } + if (messageSetFromFirstReplica.checksum != messageAndOffset.message.checksum) + println("Partition " + topicAndPartition + " has unmatched checksum at offset " + messageAndOffset.offset + + "; replica " + messageSetFromFirstReplica.replicaId + "'s checksum " + messageSetFromFirstReplica.checksum + + "; replica " + replicaId + "'s checksum " + messageAndOffset.message.checksum) + } + } + } else + isDone = true + } + if (!isDone) { + val nextOffset = messageInfoFromFirstReplicaOpt.get.nextOffset + fetchOffsetMap.put(topicAndPartition, nextOffset) + debug(expectedReplicasPerTopicAndPartition(topicAndPartition) + " replicas match at offset " + + nextOffset + " for " + topicAndPartition) + } + } + if (maxHw - fetchOffsetMap.get(topicAndPartition) > maxLag) { + maxLag = maxHw - fetchOffsetMap.get(topicAndPartition) + maxLagTopicAndPartition = topicAndPartition + } + fetchResponsePerReplica.clear() + } + val currentTimeMs = SystemTime.milliseconds + if (currentTimeMs - lastReportTime > reportInterval) { + println("Time: " + currentTimeMs + ": max lag is " + maxLag + " for partition " + maxLagTopicAndPartition) + lastReportTime = currentTimeMs + } + } +} + +private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartitions: Iterable[TopicAndPartition], + replicaBuffer: ReplicaBuffer, socketTimeout: Int, socketBufferSize: Int, + fetchSize: Int, maxWait: Int, minBytes: Int, doVerification: Boolean) + extends ShutdownableThread(name) { + val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, ReplicaVerificationTool.clientId) + val fetchRequestBuilder = new FetchRequestBuilder(). + clientId(ReplicaVerificationTool.clientId). + replicaId(Request.DebuggingConsumerId). + maxWait(maxWait). + minBytes(minBytes) + val offsetMap = replicaBuffer.fetchOffsetMap + val messageSetCache = replicaBuffer.messageSetCache + + override def doWork() { + + val fetcherBarrier = replicaBuffer.getFetcherBarrier() + val verificationBarrier = replicaBuffer.getVerificationBarrier() + + for (topicAndPartition <- topicAndPartitions) + fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, + offsetMap.get(topicAndPartition), fetchSize) + + val fetchRequest = fetchRequestBuilder.build() + debug("Issuing fetch request " + fetchRequest) + + var response: FetchResponse = null + try { + response = simpleConsumer.fetch(fetchRequest) + } catch { + case t: Throwable => + if (!isRunning.get) + throw t + } + + if (response != null) { + response.data.foreach { + case(topicAndPartition, partitionData) => + messageSetCache.get(topicAndPartition).put(sourceBroker.id, partitionData) + } + } else { + for (topicAndPartition <- topicAndPartitions) + messageSetCache.get(topicAndPartition).put(sourceBroker.id, new FetchResponsePartitionData(messages = MessageSet.Empty)) + } + + fetcherBarrier.countDown() + debug("Done fetching") + + // wait for all fetchers to finish + fetcherBarrier.await() + debug("Ready for verification") + + // one of the fetchers will do the verification + if (doVerification) { + debug("Do verifcation") + replicaBuffer.verifyCheckSum() + replicaBuffer.createNewFetcherBarrier() + replicaBuffer.createNewVerificationBarrier() + debug("Created new barrier") + verificationBarrier.countDown() + } + + verificationBarrier.await() + debug("Done verifcation") + } +} \ No newline at end of file diff --git a/kafka-patch-review.py b/kafka-patch-review.py index 7fa6cb5..daf2c35 100644 --- a/kafka-patch-review.py +++ b/kafka-patch-review.py @@ -95,12 +95,12 @@ def main(): comment="Created reviewboard " if not opt.reviewboard: - print 'Created a new reviewboard ',rb_url, + print 'Created a new reviewboard ',rb_url,' against branch ',opt.branch else: - print 'Updated reviewboard' + print 'Updated reviewboard',opt.reviewboard comment="Updated reviewboard " - comment = comment + rb_url + ' against branch ' + opt.branch + comment = comment + rb_url jira.add_comment(opt.jira, comment) if __name__ == '__main__': diff --git a/project/Build.scala b/project/Build.scala index 40e0c4f..bcd1ca5 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -44,7 +44,7 @@ object KafkaBuild extends Build { crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"), excludeFilter in unmanagedSources <<= scalaVersion(v => if (v.startsWith("2.8")) "*_2.9+.scala" else "*_2.8.scala"), scalaVersion := "2.8.0", - version := "0.8.1", + version := "0.8.0", publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"), credentials += Credentials(Path.userHome / ".m2" / ".credentials"), buildNumber := System.getProperty("build.number", ""),