diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 50c3a06..46bd175 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -23,7 +23,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException import kafka.api.{TopicMetadata, PartitionMetadata} import kafka.utils.{Logging, SystemTime, Utils, ZkUtils} import kafka.cluster.Broker -import collection.mutable.HashMap +import collection.mutable.{ListBuffer, HashMap} object AdminUtils extends Logging { val rand = new Random @@ -100,16 +100,16 @@ object AdminUtils extends Logging { val partitions = ZkUtils.getSortedPartitionIdsForTopic(zkClient, topic) val partitionMetadata = new Array[PartitionMetadata](partitions.size) - for (i <-0 until partitionMetadata.size) { - val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString)) - val inSyncReplicas = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionInSyncPath(topic, partitions(i).toString)) - val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitions(i).toString)) + for (i <- 0 until partitionMetadata.size) { + val replicas = getReplicas(topic, partitions(i), zkClient) + val inSyncReplicas = getInSyncReplicas(topic, partitions(i), zkClient) + val leader = getReplicaLeader(topic, partitions(i), zkClient) debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) partitionMetadata(i) = new PartitionMetadata(partitions(i), - if (leader == null) None else Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(leader.toInt)).head), - getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)), - getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(inSyncReplicas).map(id => id.toInt)), + leader.map(l => getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head), + getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas), + getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas), None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */) } Some(new TopicMetadata(topic, partitionMetadata)) @@ -120,6 +120,37 @@ object AdminUtils extends Logging { metadataList.toList } + def getAssignedReplicas(brokerId: Int, zkClient: ZkClient): Seq[Tuple2[String, Int]] = { + val replicaList = ListBuffer[Tuple2[String, Int]]() + val topics = ZkUtils.getTopics(zkClient) + for(topic <- topics) { + val partitions = ZkUtils.getSortedPartitionIdsForTopic(zkClient, topic) + for(partition <- partitions) { + val replicas = getReplicas(topic, partition, zkClient) + if(replicas.contains(brokerId)) { + // TODO: can the "contains" call above be replaced by a binary search for large replica lists? + replicaList.append((topic, partition)) + } + } + } + replicaList + } + + def getReplicas(topic: String, partitionId: Int, zkClient: ZkClient): Seq[Int] = { + val replicasCSV = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitionId.toString)) + Utils.getCSVList(replicasCSV).map(_.toInt) + } + + def getInSyncReplicas(topic: String, partitionId: Int, zkClient: ZkClient): Seq[Int] = { + val inSyncReplicasCsv = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionInSyncPath(topic, partitionId.toString)) + Utils.getCSVList(inSyncReplicasCsv).map(_.toInt) + } + + def getReplicaLeader(topic: String, partitionId: Int, zkClient: ZkClient): Option[Int] = { + val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString)) + Option(leader).map(_.toInt) + } + private def getBrokerInfoFromCache(zkClient: ZkClient, cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker], brokerIds: Seq[Int]): Seq[Broker] = { diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8beaf01..b529be0 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -100,7 +100,7 @@ private[log] class LogSegment(val file: File, val messageSet: FileMessageSet, va * An append-only log for storing messages. */ @threadsafe -private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging { +private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging { /* A lock that guards all modifications to the log */ private val lock = new Object diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 409091f..db7efaa 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -126,7 +126,7 @@ private[kafka] class LogManager(val config: KafkaConfig, logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate) } - private def awaitStartup() { + private[kafka] def awaitStartup() { startupLatch.await } diff --git a/core/src/main/scala/kafka/replica/Partition.scala b/core/src/main/scala/kafka/replica/Partition.scala new file mode 100644 index 0000000..990b032 --- /dev/null +++ b/core/src/main/scala/kafka/replica/Partition.scala @@ -0,0 +1,101 @@ +package kafka.replica + +import kafka.api.ProducerRequest +import scala.collection.JavaConversions +import kafka.message.MessageSet +import kafka.utils.KafkaScheduler +import java.util.concurrent.{CountDownLatch, ConcurrentHashMap, LinkedBlockingQueue} + +/** + * The logical representation of a partition. If the partition has a leader that + * is local to the broker (leader.isLocal), then there are two threads that run: + * + * 1. A commit thread that runs and "commits" messages + * 2. A high watermark checkpoint thread that runs and periodically + * + */ +class Partition( val topic: String, + val partitionId: Int, + var leader: Option[Replica], + private val scheduler: KafkaScheduler = null ) { + val commitThreadName = "commit-thead-" + topic + "-" + partitionId + val hwCheckpointThreadName = "hw-checkpoint-thread-" + topic + "-" + partitionId + + val assignedReplicas = JavaConversions.asMap(new ConcurrentHashMap[Int, Replica]()) + val inSyncReplicas = JavaConversions.asMap(new ConcurrentHashMap[Int, Replica]()) + val catchingUpReplicas = JavaConversions.asMap(new ConcurrentHashMap[Int, Replica]()) + val reassignedReplicas = JavaConversions.asMap(new ConcurrentHashMap[Int, Replica]()) + + private var commitQueue: LinkedBlockingQueue[ProducerRequest] = null + @volatile private var commitThread: PartitionCommitThread = null + + if(hasLocalLeader) { + commitQueue = new LinkedBlockingQueue[ProducerRequest]() + commitThread = new PartitionCommitThread() +// scheduler.scheduleWithRate(checkpointHW, 0, 0) + } + + def this(topic: String, partitionId: Int) = this(topic, partitionId, None) + + def leader_=(newLeader: Replica) { + this.leader = Option(newLeader) + if(!newLeader.isLeader && this.leader.map(_.brokerId != newLeader.brokerId).getOrElse(false)) { + // leader is no longer local, stop commit thread + commitThread.shutdown() + // need to unschedule the checkpointing too, change to KafkaScheduler + } + } + + def startCommitThread() { + //commitThread.map(_.start()) + } + + def startHWCheckpointThread() { + //hwCheckpointThread.map(_.start()) + } + + def hasLocalLeader = leader.map(_.isLocal).getOrElse(false) + + def queueMessage(messages: MessageSet) { + if(hasLocalLeader) { + leader.map(_.appendMessages(messages)) + //commitQueue.offer(messages) + } else { + //throw new NotLeaderException() + } + } + + /** + * The thread that commit's all messages that are written to the local log. + * This is only done if the leader exists local. + */ + // TODO: Fill this out in KAFKA-46 + // TODO: this thread shutdown behaviour is pretty commit, can we extract? + class PartitionCommitThread() extends Thread(commitThreadName) { + @volatile var stopped = false + val shutdownLatch = new CountDownLatch(1) + + def shutdown() { + stopped = true + interrupt() + shutdownLatch.await() + } + + override def run() { + while(!stopped) { + val request = commitQueue.poll() + + } + shutdownLatch.countDown() + } + } + + /** + * The thread that periodically puts the partitions HW in ZK. Only exists if the + * local replica is the leader. + */ + // TODO: Fill this out in KAFKA-46 + private def checkpointHW() { + + } +} diff --git a/core/src/main/scala/kafka/replica/Replica.scala b/core/src/main/scala/kafka/replica/Replica.scala new file mode 100644 index 0000000..742c0ab --- /dev/null +++ b/core/src/main/scala/kafka/replica/Replica.scala @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.replica + +import kafka.log.Log +import kafka.cluster.Broker +import kafka.consumer.SimpleConsumer +import java.util.concurrent.{ConcurrentSkipListMap, CountDownLatch} +import collection.mutable.ListBuffer +import java.util.Collections +import collection.JavaConversions +import kafka.utils.{ZkUtils, Utils, Logging} +import org.I0Itec.zkclient.IZkDataListener +import kafka.message.MessageSet + +/** + * A replica within the system. A replica can either be local, in which case its + * "isLocal" variable is true and there is a local log, or the replica is remote + * and there is no log and "isLocal" is false. + * + * A local replica manages its log. A replica manages upto three listeners on ZK paths: + * 1. /brokers/topics/[topic]/partitions/[partitionId]/leader + * 2. /brokers/topics/[topic]/partitions/[partitionId]/replicas + * 3. /brokers/partition_reassignment/[topic]/partitions/[partitionId] + * + * The first is to be aware of leader changes. The second is to be aware when + * the list of assigned replicas is modified (usually to determine if we need to remove + * this replica from the broker). The last is only active if we're the leader of the + * partition. + * + */ +class Replica( val brokerId: Int, + val partition: Partition, + val logOption: Option[Log], + val replicaManager: ReplicaManager ) extends Logging { + val fetcherThreadName = "replica-fetcher-" + partition.topic + partition.partitionId + var started = false + @volatile var stopped = false + + val isLocal = logOption.isDefined + private[kafka] var highWaterMark = if(isLocal) logOption.get.getHighwaterMark else 0 + private[kafka] var logEndOffset = if(isLocal) logOption.get.nextAppendOffset else 0 + val waiters = new ConcurrentSkipListMap[Long, java.util.List[CountDownLatch]]() + private val leaderListener = new ZkLeaderChangeListener + private val changeListener = new ZkReplicaChangeListener + private val reassignmentListener = if(isLeader) Some(new ZkReassignmentListener) else None + + @volatile + private var replicaFetcher: Option[ReplicaFetcher] = None + + def this(brokerId: Int, partition: Partition, theLog: Log, replicaManager: ReplicaManager) = + this(brokerId, partition, Some(theLog), replicaManager) + + def this(brokerId: Int, partition: Partition, replicaManager: ReplicaManager) = + this(brokerId, partition, None, replicaManager) + + def isLeader = this == partition.leader + + def start() { + if(!started) { + replicaManager.zkClient.subscribeDataChanges(leaderListener.getZkPath, leaderListener) + replicaManager.zkClient.subscribeDataChanges(changeListener.getZkPath, changeListener) + reassignmentListener.map( listener => + replicaManager.zkClient.subscribeDataChanges(listener.getZkPath, listener) + ) + } + } + + def stop() { + if(!stopped) { + stopFetching() + replicaManager.zkClient.unsubscribeDataChanges(leaderListener.getZkPath, leaderListener) + replicaManager.zkClient.unsubscribeDataChanges(changeListener.getZkPath, changeListener) + reassignmentListener.map( listener => + replicaManager.zkClient.unsubscribeDataChanges(listener.getZkPath, listener) + ) + } + } + + def appendMessages(messageSet: MessageSet) { + if(!isLeader) { + throw new RuntimeException() + } else { + val log = logOption.get + log.append(messageSet) + setLogEndOffset(log.nextAppendOffset) + } + } + + def leaderChange() { + stopFetching() + startFetching() + } + + def waitForOffset(offset: Long): CountDownLatch = { + import JavaConversions._ + + if(logEndOffset > offset) { + new CountDownLatch(0) + } else { + val latch = new CountDownLatch(1) + val list = waiters.putIfAbsent(offset, Collections.synchronizedList(new ListBuffer[CountDownLatch]() += latch)) + if(list != null) { + list.add(latch) + } + latch + } + } + + private def stopFetching() { + replicaFetcher.map(_.shutdown()) + } + + private def startFetching() { + if(!isLeader && partition.leader.isDefined && replicaFetcher.map(!_.stopped).getOrElse(false)) { + val leaderBroker = ZkUtils.getBrokerInfoFromIds(replicaManager.zkClient, List(partition.leader.get.brokerId)) + replicaFetcher = Some(new ReplicaFetcher(leaderBroker.head)) + replicaFetcher.get.start() + } + } + + private def setLogEndOffset(offset: Long) { + import JavaConversions._ + + this.logEndOffset = offset + val validWaiters = this.waiters.headMap(offset, true) + validWaiters.values().foreach(_.foreach(_.countDown())) + validWaiters.clear() + } + + /** + * The thread that fetches data from the leader replica. It's only active + * for local replicas + * + */ + // TODO: Fill this out in KAFKA-46 + class ReplicaFetcher(val leader: Broker) extends Thread(fetcherThreadName) { + @volatile var stopped = false + val shutdownLatch = new CountDownLatch(1) + private val simpleConsumer = new SimpleConsumer(leader.host, leader.port, 100, 100) + // config.socketTimeoutMs, config.socketBufferSize) + + def shutdown() { + stopped = true + interrupt() + shutdownLatch.await() + } + + override def run() { + while(!stopped) { + + } + + shutdownLatch.countDown() + } + } + + /** + * A ZooKeeper listener that waits for changes to the leader for a given replica. + * + */ + private class ZkLeaderChangeListener extends IZkDataListener { + + def getZkPath = ZkUtils.getTopicPartitionLeaderPath(partition.topic, partition.partitionId.toString) + + def handleDataChange(path: String, data: AnyRef) { + val leader = data.asInstanceOf[String] + leaderChange(Some(leader)) + } + + def handleDataDeleted(path: String) { + leaderChange(None) + } + + private def leaderChange(newLeader: Option[String]) { + newLeader match { + case Some(leader) => + /* if the leader is this replica, we've already been bootstrapped as part of the election + * process and this listener was kicked off when the new replica (this one) was written to ZK. + * otherwise, we follow the new leader + */ + if(leader.toInt != brokerId) + replicaManager.becomeFollower(leader, Replica.this) + case None => + replicaManager.leaderElection(Replica.this) + } + } + } + + /** + * A ZooKeeper listener that waits for changes to the assigned replicas for + * a given partition. This exists in case a broker is unassigned from a + * given partition. + * + */ + private class ZkReplicaChangeListener extends IZkDataListener { + + def getZkPath = ZkUtils.getTopicPartitionReplicasPath(partition.topic, partition.partitionId.toString) + + def handleDataChange(path: String, data: AnyRef) { + if(data != null) { + val replicaList = Utils.getCSVList(data.asInstanceOf[String]) + val assigned = replicaList.contains(replicaManager.getBrokerId.toString) + if(!assigned) replicaManager.removeReplica(partition.topic, partition.partitionId) + } + } + + def handleDataDeleted(path: String) { + replicaManager.removeReplica(partition.topic, partition.partitionId) + } + } + + /** + * The reassignment listener for *this* replica. Only exists if this replica is a leader + */ + private class ZkReassignmentListener extends IZkDataListener { + + def getZkPath = ZkUtils.getPartitionReassignmentPath(partition.topic, partition.partitionId.toString) + + def handleDataChange(path: String, data: AnyRef) { + val reassignedReplicas = data.asInstanceOf[String].split(",").map(_.toInt).toSet + if(partition.hasLocalLeader) { + val currentRar = partition.reassignedReplicas.keySet + val addedRar = reassignedReplicas -- currentRar + val removedRar = currentRar -- reassignedReplicas + addedRar.foreach( bId => partition.reassignedReplicas.put(bId, new Replica(bId, partition, replicaManager))) + removedRar.foreach( bId => partition.reassignedReplicas.remove(bId)) + } + } + + def handleDataDeleted(path: String) { + partition.reassignedReplicas.clear() + } + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/replica/ReplicaManager.scala b/core/src/main/scala/kafka/replica/ReplicaManager.scala new file mode 100644 index 0000000..936fa81 --- /dev/null +++ b/core/src/main/scala/kafka/replica/ReplicaManager.scala @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.replica + +import java.util.Random +import java.util.concurrent.TimeUnit +import kafka.admin.AdminUtils +import kafka.log.LogManager +import kafka.server.KafkaConfig +import kafka.utils._ +import collection.mutable.{HashSet, HashMap} +import collection.JavaConversions +import org.I0Itec.zkclient.IZkChildListener +import org.I0Itec.zkclient.exception.ZkNodeExistsException + +/** + * As the name suggests, this class is responsible for "managing" the replicas and partitions + * on this broker. It has functionality to perform partition mastership election, removal + * and addition of new partition replicas onto this broker and general bootstrap logic. + * + * A broker should have only one of these running at a time. The manager keeps holds + * listeners on these ZK paths: + * + * 1. /brokers/partitions_reassigned/topics/[topic]/partitions/[partitionId] + * 2. /brokers/assigned_partitions/[topic:partitionId] + * + */ +// TODO: Use KafkaZooKeeper instead of passing around the zkClient +class ReplicaManager( val config: KafkaConfig, + private val logManager: LogManager, + private val scheduler: KafkaScheduler ) extends Logging { + + logManager.awaitStartup() + + val zkClient = logManager.getZookeeperClient + private val replicas = new Pool[Tuple2[String, Int], Replica]() + private val partitions = new Pool[Tuple2[String, Int], Partition]() + private val random = new Random() + + // ------------- Public API ------------- // + + def getBrokerId = config.brokerId + + def getTopicMap: Map[String, List[Int]] = { + val topicMap = new HashMap[String, List[Int]]() + val topicPartitionTuples = partitions.keys + for( (topic, partition) <- topicPartitionTuples ) { + topicMap.get(topic) match { + case Some(partitionList) => topicMap.put(topic, partition :: partitionList) + case None => topicMap.put(topic, List(partition)) + } + } + topicMap.toMap + } + + // TODO: + // 1. Assigned Replicas should be directly retrieved from path2 in the comments + // 2. Need to add in the generic partition reassignment zk listener + def startup() { + info("Starting up replica manager on " + config.brokerId) + val assignedReplicas = AdminUtils.getAssignedReplicas(config.brokerId, zkClient) + for( (topic, partitionId) <- assignedReplicas ) { + val partition = new Partition(topic, partitionId) + val replica = new Replica(config.brokerId, partition, logManager.getOrCreateLog(topic, partitionId), this) + + // initialize the replica + replicaStateChange(replica) + + replicas.put((topic, partitionId), replica) + partitions.put((topic, partitionId), partition) + } + + // setup listener in ZK for partition assignment or changes + val partitionAssignmentListener = new ZkPartitionAssignmentListener() + zkClient.subscribeChildChanges(partitionAssignmentListener.getZkPath, partitionAssignmentListener) + + // setup listener in ZK for partition reassignment +// val replicaReassignmentListener = new ZkReplicaReassignmentListener(this, getTopicMap) +// val topics = zkClient.subscribeChildChanges(ZkUtils.BrokerPartitionReassignmentPath, replicaReassignmentListener) +// JavaConversions.asIterable(topics).foreach{ topic => +// val partitions = zkClient.subscribeChildChanges(ZkUtils.getPartitionsForReassignmentPath(topic), replicaReassignmentListener) +// JavaConversions.asIterable(partitions).foreach{ partition => +// zkClient.subscribeDataChanges(ZkUtils.getPartitionReassignmentPath(topic, partition), replicaReassignmentListener) +// } +// } + } + + def getPartition(topic: String, partId: Int): Option[Partition] = partitions.getOption((topic, partId)) + + def getPartitions: Set[Tuple2[String, Int]] = partitions.keys.toSet + + def getReplica(topic: String, partId: Int): Option[Replica] = replicas.getOption((topic, partId)) + + + // ------------- Private, internal API ------------- // + // Functionality to support replication behaviour below. These methods should not + // be used outside the replica manager class + + private[replica] def removeReplica(topic: String, partId: Int): Boolean = { + replicas.removeOption((topic, partId)) match { + case Some(replica) => replica.stop() + case None => + } + true + } + + private[replica] def addReplica(topic: String, partId: Int): Replica = { + (replicas.getOption((topic, partId)), partitions.getOption((topic, partId))) match { + case (None, None) => + val partition = new Partition(topic, partId) + val replica = new Replica(config.brokerId, partition, logManager.getOrCreateLog(topic, partId), this) + + // bootstrap + replicaStateChange(replica) + + replicas.put((topic, partId), replica) + partitions.put((topic, partId), partition) + replica + case (Some(replica),_) => replica + case (_, _) => throw new RuntimeException() + } + } + + // replica state changed is only called when attempting to initialize a replica + private[replica] def replicaStateChange(replica: Replica) { + val (topic, partitionId) = (replica.partition.topic, replica.partition.partitionId) + if(!replica.started) { + replica.start() + } + + // determine who the leader is, if there is one that isn't us we follow otherwise we attempt election + val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString)) + if(leader != null) + becomeFollower(leader, replica) + else + leaderElection(replica) + } + + // follow the given leader + private[replica] def becomeFollower(leader: String, replica: Replica) { + replica.leaderChange() + + } + + private[replica] def becomeLeader(replica: Replica, inSyncReplicas: Seq[Replica]) { + val partition = replica.partition + val (topic, partitionId) = (partition.topic, partition.partitionId) + // stop HW checkpoint thread + + replica.highWaterMark = replica.logEndOffset + val newSynced = new HashMap[Int, Replica]() + for(syncedReplica <- inSyncReplicas) { + val synced = syncedReplica.waitForOffset(replica.highWaterMark).await(config.leaderElectionWaitTime, TimeUnit.MILLISECONDS) + if(synced) newSynced.put(syncedReplica.brokerId, syncedReplica) + } + newSynced.values.foreach(isr => partition.inSyncReplicas.put(isr.brokerId, isr)) + ZkUtils.updatePersistentPath( zkClient, + ZkUtils.getTopicPartitionInSyncPath(topic, partitionId.toString), + Utils.iterableToCSV(newSynced.keySet.map(_.toString)) ) + + val allReplicas = AdminUtils.getReplicas(topic, partitionId, zkClient).map(new Replica(_, partition, this)) + val catchingUpReplicas = allReplicas.filterNot(ar => newSynced.contains(ar.brokerId) || ar.brokerId == replica.brokerId) + // val reassigned = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.get) + allReplicas.foreach(ar => partition.assignedReplicas.put(ar.brokerId, ar)) + catchingUpReplicas.foreach(cur => partition.catchingUpReplicas.put(cur.brokerId, cur)) + + partition.leader = Some(replica) + partition.startCommitThread() + partition.startHWCheckpointThread() + + // write this replica as the leader in ZK + ZkUtils.updateEphemeralPath( zkClient, + ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString), + replica.brokerId.toString ) + } + + private[replica] def leaderElection(replica: Replica) { + val partition = replica.partition + val (topic, partitionId) = (partition.topic, partition.partitionId) + val inSyncReplicas = AdminUtils.getInSyncReplicas(topic, partitionId, zkClient) + if(inSyncReplicas.size == 0 || inSyncReplicas.contains(replica.brokerId)) { + if(inSyncReplicas.headOption.map(_ != replica.brokerId).getOrElse(false)) { + // wait a random amount of time if we're not preferred + val randomWait = random.nextInt(100) + info("Randomly waiting " + randomWait + " ms for leader election on replica " + (topic, partition)) + swallowWarn(Thread.sleep(randomWait)) + } + try { + info(replica.brokerId + ": Attempting to become leader for " + (topic, partitionId)) + ZkUtils.createEphemeralPathExpectConflict( + zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString), replica.brokerId.toString) + info(replica.brokerId + ": Successfully became leader for " + (topic, partitionId)) + becomeLeader(replica, inSyncReplicas.map(new Replica(_, partition, this))) + } catch { + case e: ZkNodeExistsException => + val leader = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString)) + info(replica.brokerId + ": Unsuccessful leader proposal, follow " + leader + " for " + (topic, partitionId)) + becomeFollower(leader, replica) + case e1 => throw e1 + } + } + } + + /** + * A ZooKeeper listener that listens for new partition assignments. Exactly one of these + * should exist for a broker. + * + */ + class ZkPartitionAssignmentListener extends IZkChildListener with Logging { + + def getZkPath = ZkUtils.getPartitionAssignmentPath(config.brokerId.toString) + + def handleChildChange(path: String, currChildren: java.util.List[String]) { + val newPartitions = new HashSet[Tuple2[String, Int]] + for( topicPartition <- JavaConversions.asIterable(currChildren) ) { + val parts = topicPartition.split(":").toList + if(parts.size != 2) { + warn("Error with assigned partition " + topicPartition) + } else { + newPartitions.add((parts(0), parts(1).toInt)) + } + } + val currentPartitions = getPartitions + val addedPartitions = newPartitions -- currentPartitions + addedPartitions.foreach(tuple => addReplica(tuple._1, tuple._2)) + } + } +} + +///** +// * A ZooKeeper listener that waits for partition reassignments for a given partition. +// * This is different than the one for the leading replica +// * +// */ +//private class ZkReplicaReassignmentListener(val replicaManager: ReplicaManager, val topicMap: Map[String, List[Int]]) +// extends IZkChildListener with IZkDataListener with Logging { +// +// val currTopics = new AtomicReference(topicMap.keys) +// val currTopicMap = new AtomicReference(topicMap.mapValues(_.toSet)) +// +// +// def getZkPath = null +// +// def handleChildChange(path: String, currChildren: java.util.List[String]) { +// if(path.endsWith("topics")) handleNewTopics(path, currChildren) +// else if(path.endsWith("partitions")) handleNewPartitions(path, currChildren) +// } +// +// def handleDataChange(path: String, data: AnyRef) { +// getTopicPartition(path) match { +// case Some((topic, partition)) => +// val reassignedReplicas = data.asInstanceOf[String].split(",").map(_.toInt).toSet +// case None => +// // skippitty +// } +// } +// +// def handleDataDeleted(path: String) { +// +// } +// +// private def getTopicPartition(path: String): Option[Tuple2[String, Int]] = { +// val parts = path.split("/") +// if(parts.size != 6) { +// None +// } else { +// Some((parts(2), parts(5).toInt)) +// } +// } +// +// private def handleNewTopics(path: String, topics: java.util.List[String]) { +// val newTopics = JavaConversions.asIterable(topics).toSet +// val previousTopics = currTopics.getAndSet(newTopics) +// val added = newTopics -- previousTopics +// added.foreach{topic => +// val parts = replicaManager.zkClient.subscribeChildChanges(ZkUtils.getPartitionsForReassignmentPath(topic), this) +// JavaConversions.asIterable(parts).foreach{ part => +// replicaManager.zkClient.subscribeChildChanges(ZkUtils.getPartitionReassignmentPath(topic, part), this) +// } +// } +// } +// +// private def handleNewPartitions(path: String, partitions: java.util.List[String]) { +// val newPartitions = JavaConversions.asIterable(partitions) +// for( partition <- newPartitions ) { +// if(replicaManager.getPartition()) +// } +// } +//} + + + + + + + + diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 03619c3..8b9091e 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -96,4 +96,7 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) { /* default replication factors for automatically created topics */ val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1) + + /* default amount of time the leader waits for replicas to sync up before considering it not in sync */ + val leaderElectionWaitTime = Utils.getInt(props, "default.leader.election.wait.time", 1000) } diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index d62fa77..bbc18fd 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -39,8 +39,12 @@ class Pool[K,V] extends Iterable[(K, V)] { def get(key: K): V = pool.get(key) + def getOption(key: K): Option[V] = Option(get(key)) + def remove(key: K): V = pool.remove(key) + def removeOption(key: K): Option[V] = Option(remove(key)) + def keys = JavaConversions.asSet(pool.keySet()) def values: Iterable[V] = diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 43ec488..0cc6ba9 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -597,13 +597,21 @@ object Utils extends Logging { } def seqToCSV(seq: Seq[String]): String = { - var csvString = "" - for (i <- 0 until seq.size) { - if (i > 0) - csvString = csvString + ',' - csvString = csvString + seq(i) +// var csvString = "" +// for (i <- 0 until seq.size) { +// if (i > 0) +// csvString = csvString + ',' +// csvString = csvString + seq(i) +// } +// csvString + iterableToCSV(seq) + } + + def iterableToCSV(iterable: Iterable[String]): String = { + Option(iterable) match { + case Some(iter) => if(!iter.isEmpty) iter.tail.foldLeft(iter.head)(_ + "," + _) else "" + case None => "" } - csvString } def getTopicRentionHours(retentionHours: String) : Map[String, Int] = { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 9599c15..6f62cdc 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -17,18 +17,20 @@ package kafka.utils -import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.serialize.ZkSerializer import kafka.cluster.{Broker, Cluster} import scala.collection._ import java.util.Properties import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError} import kafka.consumer.TopicCount +import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, ZkClient} object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" + val BrokerPartitionAssignmentPath = "/brokers/assigned_partitions" + val BrokerPartitionReassignmentPath = "/brokers/reassigned_partitions/topics" def getTopicPath(topic: String): String ={ BrokerTopicsPath + "/" + topic @@ -58,10 +60,26 @@ object ZkUtils extends Logging { getTopicPartitionPath(topic, partitionId) + "/" + "leader" } + def getPartitionAssignmentPath(brokerId: String): String = { + BrokerPartitionAssignmentPath + "/" + brokerId + } + + def getPartitionsForReassignmentPath(topic: String): String = { + BrokerPartitionReassignmentPath + "/" + topic + } + + def getPartitionReassignmentPath(topic: String, partitionId: String): String = { + BrokerPartitionReassignmentPath + "/" + topic + "/partitions/" + partitionId + } + def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={ ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted } + def getTopics(zkClient: ZkClient): Seq[String] ={ + ZkUtils.getChildren(zkClient, ZkUtils.BrokerTopicsPath).sorted + } + def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val broker = new Broker(id, creator, host, port) diff --git a/core/src/test/scala/unit/kafka/replica/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/replica/ReplicaManagerTest.scala new file mode 100644 index 0000000..3b57794 --- /dev/null +++ b/core/src/test/scala/unit/kafka/replica/ReplicaManagerTest.scala @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.replica + +import kafka.zk.ZooKeeperTestHarness +import org.scalatest.junit.JUnit3Suite +import kafka.server.KafkaConfig +import org.junit.{Test, Before, After} +import org.easymock.EasyMock +import kafka.admin.AdminUtils +import kafka.utils._ +import kafka.log.{Log, LogManager} +import org.I0Itec.zkclient.ZkClient +import junit.framework.Assert._ + + +class ReplicaManagerTest extends JUnit3Suite with ZooKeeperTestHarness { + + val time: MockTime = new MockTime() + var config:KafkaConfig = null + val zookeeperConnect = TestZKUtils.zookeeperConnect + var replicaManager: ReplicaManager = null + var zkClient: ZkClient = null + + @Before + override def setUp() { + super.setUp() + config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) + TestUtils.createBrokersInZk(zookeeper.client, List(0, 1)) + zkClient = zookeeper.client + } + + @After + override def tearDown() { + super.tearDown() + } + + @Test + def testReplicaManagerCanStartupCorrectly() { + /* setup ZK with proper replicas. + * the below will setup the following replica assignment + * broker-0 broker-1 broker-2 + * p0 p1 p2 (1st replica) + * p3 p4 p5 (1st replica) + * p2 p0 p1 (2nd replica) + * p4 p5 p3 (2nd replica) + */ + val replicaAssignment = AdminUtils.assignReplicasToBrokers(List("0", "1", "3"), 6, 2, 0) + AdminUtils.createReplicaAssignmentPathInZK("topic1", replicaAssignment, zkClient) + + // create the log manager and have them return the logs in the order + val logManager = getMockLogManagerReadyForStartup + EasyMock.expect(logManager.getOrCreateLog("topic1", 0)).andReturn(getDumbLog) + EasyMock.expect(logManager.getOrCreateLog("topic1", 3)).andReturn(getDumbLog) + EasyMock.expect(logManager.getOrCreateLog("topic1", 2)).andReturn(getDumbLog) + EasyMock.expect(logManager.getOrCreateLog("topic1", 4)).andReturn(getDumbLog) + EasyMock.replay(logManager) + + // perform startup + replicaManager = new ReplicaManager(config, logManager, null) + replicaManager.startup() + + // verify mock + EasyMock.verify(logManager) + + // no other brokers are up, broker 0 should be the leader for every replica + assertEquals(ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionLeaderPath("topic1", "0")), "0") + assertEquals(ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionLeaderPath("topic1", "3")), "0") + assertEquals(ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionLeaderPath("topic1", "2")), "0") + assertEquals(ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionLeaderPath("topic1", "4")), "0") + + assertEquals(None, replicaManager.getPartition("topic1", 1)) + } + + @Test + def testReplicaLeaderListener() { + val topic = "topic1" + val partId = "0" + val partition = new Partition(topic, partId.toInt) + + val replicaManager = EasyMock.createMock(classOf[ReplicaManager]) + val replica = new Replica(0, partition, getDumbLog, replicaManager) + + // setup replicaManager mock + EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.expectLastCall().times(2) // expect twice since the replica isn't the leader + EasyMock.replay(replicaManager) + + // this should setup ZK listeners + replica.start() + EasyMock.verify(replicaManager) + + val newLeader = "1" + // change leader to a new one, expect ZK to call replicaManager + EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.becomeFollower(newLeader, replica)) + EasyMock.replay(replicaManager) + + ZkUtils.updateEphemeralPath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partId), newLeader) + Thread.sleep(100) + EasyMock.verify(replicaManager) + + // remove the leader emphemeral path altogether, should do leader election + EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.leaderElection(replica)) + EasyMock.replay(replicaManager) + + ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partId)) + Thread.sleep(100) + EasyMock.verify(replicaManager) + } + + + @Test + def testReplicaChangeListener() { + val topic = "topic1" + val partId = "0" + val partition = new Partition(topic, partId.toInt) + + val replicaManager = EasyMock.createMock(classOf[ReplicaManager]) + val replica = new Replica(0, partition, getDumbLog, replicaManager) + + // make replica list "0,1,2", three brokers (we are 0) + ZkUtils.updateEphemeralPath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partId), "0,1,2") + + // setup replicaManager mock + EasyMock.expect(replicaManager.zkClient).andReturn(zkClient) + EasyMock.expectLastCall().times(2) // expect twice since the replica isn't the leader + + // change list to remove broker 0 from the assigned replicas, this should delete the local replica + EasyMock.expect(replicaManager.getBrokerId).andReturn(config.brokerId) + EasyMock.expect(replicaManager.removeReplica(topic, partId.toInt)).andReturn(true) + EasyMock.replay(replicaManager) + + replica.start() + ZkUtils.updateEphemeralPath(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partId), "1,2") + Thread.sleep(100) + + EasyMock.verify(replicaManager) + + // let's try deleting the path, this shouldn't invoke a ZK callback since the + // listener for broker 0 should have been removed + EasyMock.reset(replicaManager) + ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partId)) + EasyMock.verify() + } + + def getDumbLog: Log = { + val log = EasyMock.createNiceMock(classOf[Log]) + EasyMock.replay(log) + log + } + + def getMockLogManagerReadyForStartup: LogManager = { + val logManager = EasyMock.createMock(classOf[LogManager]) + EasyMock.checkOrder(logManager, true) + EasyMock.expect(logManager.awaitStartup()) + EasyMock.expect(logManager.getZookeeperClient).andReturn(zkClient) + EasyMock.checkOrder(logManager, false) + + logManager + } + +} \ No newline at end of file