From 0a0a5ea0ea7d49162a58566df4e2ed44101e0e58 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Mon, 13 Jul 2015 23:15:39 -0700 Subject: [PATCH] KAFKA-2188 - JBOD Support --- core/src/main/scala/kafka/cluster/Partition.scala | 18 +- .../common/GenericKafkaStorageException.scala | 27 ++ .../scala/kafka/controller/KafkaController.scala | 123 ++++++- .../kafka/controller/PartitionLeaderSelector.scala | 79 +++++ core/src/main/scala/kafka/log/Log.scala | 385 +++++++++++---------- core/src/main/scala/kafka/log/LogManager.scala | 151 ++++++-- core/src/main/scala/kafka/log/LogSegment.scala | 16 +- core/src/main/scala/kafka/server/KafkaApis.scala | 5 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 11 +- .../main/scala/kafka/server/OffsetCheckpoint.scala | 39 ++- .../scala/kafka/server/ReplicaFetcherThread.scala | 7 +- .../main/scala/kafka/server/ReplicaManager.scala | 157 +++++++-- core/src/main/scala/kafka/utils/ZkUtils.scala | 43 ++- .../test/scala/unit/kafka/log/LogManagerTest.scala | 22 +- .../server/HighwatermarkPersistenceTest.scala | 9 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 2 +- 16 files changed, 773 insertions(+), 321 deletions(-) create mode 100644 core/src/main/scala/kafka/common/GenericKafkaStorageException.scala diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 2649090..6e08da5 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -89,7 +89,13 @@ class Partition(val topic: String, val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchTopicConfig(zkClient, topic)) val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) - val offsetMap = checkpoint.read + val offsetMap = + try { + checkpoint.read + } catch { + case e: IOException => throw new GenericKafkaStorageException( + log.dir.getParentFile, "Failed to read highwatermark for replica " + replicaId, e) + } if (!offsetMap.contains(TopicAndPartition(topic, partitionId))) info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId)) val offset = offsetMap.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset) @@ -140,14 +146,8 @@ class Partition(val topic: String, assignedReplicaMap.clear() inSyncReplicas = Set.empty[Replica] leaderReplicaIdOpt = None - try { - logManager.deleteLog(TopicAndPartition(topic, partitionId)) - removePartitionMetrics() - } catch { - case e: IOException => - fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e) - Runtime.getRuntime().halt(1) - } + logManager.deleteLog(TopicAndPartition(topic, partitionId)) + removePartitionMetrics() } } diff --git a/core/src/main/scala/kafka/common/GenericKafkaStorageException.scala b/core/src/main/scala/kafka/common/GenericKafkaStorageException.scala new file mode 100644 index 0000000..ccc5bd9 --- /dev/null +++ b/core/src/main/scala/kafka/common/GenericKafkaStorageException.scala @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.common + +import java.io.File + +/** + * Common exception and generally a wrapper for IOException that encapsulates a log file or logs directory where IO + * exception occurred + */ +case class GenericKafkaStorageException(dirOrFile: File, message: String, t: Throwable) + extends KafkaStorageException(message, t) { +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b4fc755..a5352a8 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -53,6 +53,7 @@ class ControllerContext(val zkClient: ZkClient, var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet + var partitionsBeingRestarted: Map[String, Map[TopicAndPartition, Seq[Int]]] = mutable.Map.empty private var liveBrokersUnderlying: Set[Broker] = Set.empty private var liveBrokerIdsUnderlying: Set[Int] = Set.empty @@ -170,6 +171,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt private val partitionReassignedListener = new PartitionsReassignedListener(this) private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this) + private val restartPartitionsListener = new RestartPartitionsListener(this) private val isrChangeNotificationListener = new IsrChangeNotificationListener(this) newGauge( @@ -311,6 +313,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt registerReassignedPartitionsListener() registerIsrChangeNotificationListener() registerPreferredReplicaElectionListener() + registerRestartPartitionsListener() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() @@ -322,6 +325,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt brokerState.newState(RunningAsController) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() + maybeTriggerPartitionRestart() /* send partition leadership info to all live brokers */ sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) if (config.autoLeaderRebalanceEnable) { @@ -345,6 +349,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt deregisterIsrChangeNotificationListener() deregisterReassignedPartitionsListener() deregisterPreferredReplicaElectionListener() + deregisterRestartPartitionsListener() // shutdown delete topic manager if (deleteTopicManager != null) @@ -639,6 +644,43 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt } } + /** + * The goal of partition restart is to resume offlined partitions that encountered unrecoverable exceptions on + * another broker. + * + * The steps to restart a partition are the following: + * 1. Move replicas to Offline state (in reality the replica at this time is already stopped by the broker, + * but we explicitly send StopReplica to ensure ReplicaStateMachine holds consistent states) + * + * 2. Move partition from Offline to Online state to trigger special leader selector + * which will ensure following replica remains in the followers set, leading replica resigns and + * becomes follower. + * + * 3. Move replicas to Online state. + */ + def initiatePartitionsRestart(partitionsToBeRestarted: Map[TopicAndPartition, Seq[Int]], zkPath: String): Unit = { + info("Starting partitions restart on brokers: " + partitionsToBeRestarted) + try { + val partitionAndReplicas = partitionsToBeRestarted.map { + case (TopicAndPartition(t, p), replicas) => + replicas.map(r => PartitionAndReplica(t, p, r)) + }.toSet.flatten + replicaStateMachine.handleStateChanges(partitionAndReplicas, OfflineReplica) + + val leaderSelector = new RestartedPartitionLeaderSelector(controllerContext, config, partitionsToBeRestarted.toMap) + deleteTopicManager.markTopicIneligibleForDeletion(partitionsToBeRestarted.keySet.map(_.topic)) + partitionStateMachine.handleStateChanges(partitionsToBeRestarted.keySet, OnlinePartition, leaderSelector) + + replicaStateMachine.handleStateChanges(partitionAndReplicas, OnlineReplica) + } catch { + case e: Throwable => error("Error restarting partitions %s".format(partitionsToBeRestarted), e) + } finally { + controllerContext.partitionsBeingRestarted -= zkPath + CoreUtils.swallowWarn(ZkUtils.deletePath(zkClient, zkPath)) + deleteTopicManager.resumeDeletionForTopics(partitionsToBeRestarted.keySet.map(_.topic)) + } + } + def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) { info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(","))) try { @@ -733,6 +775,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt initializePreferredReplicaElection() initializePartitionReassignment() initializeTopicDeletion() + initializePartitionRestart() info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds)) info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds)) info("Current list of topics in the cluster: %s".format(controllerContext.allTopics)) @@ -776,14 +819,22 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt info("Resuming reassignment of partitions: %s".format(partitionsToReassign.toString())) } + private def initializePartitionRestart(): Unit = { + controllerContext.partitionsBeingRestarted ++= ZkUtils.getPartitionsBeingRestarted(zkClient) + info("Resuming partitions being restarted: %s".format(controllerContext.partitionsBeingRestarted)) + } + private def initializeTopicDeletion() { val topicsQueuedForDeletion = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.DeleteTopicsPath).toSet val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) => replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic) val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic) val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic) + val topicsForWhichPartitionsRestartInProgress = controllerContext.partitionsBeingRestarted.values.flatMap{ + partitions => partitions.keySet.map(_.topic) + }.toSet val topicsIneligibleForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress | - topicsForWhichPreferredReplicaElectionIsInProgress + topicsForWhichPreferredReplicaElectionIsInProgress | topicsForWhichPartitionsRestartInProgress info("List of topics to be deleted: %s".format(topicsQueuedForDeletion.mkString(","))) info("List of topics ineligible for deletion: %s".format(topicsIneligibleForDeletion.mkString(","))) // initialize the topic deletion manager @@ -800,6 +851,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt onPreferredReplicaElection(controllerContext.partitionsUndergoingPreferredReplicaElection.toSet) } + def maybeTriggerPartitionRestart() { + controllerContext.partitionsBeingRestarted.toList.sortBy(_._1).foreach{ + case (childPath, partitions) => + initiatePartitionsRestart(partitions, childPath) + } + } + private def startChannelManager() { controllerContext.controllerChannelManager = new ControllerChannelManager(controllerContext, config) controllerContext.controllerChannelManager.startup() @@ -916,6 +974,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) } + private def registerRestartPartitionsListener() { + zkClient.subscribeChildChanges(ZkUtils.RestartPartitionsPath, restartPartitionsListener) + } + + private def deregisterRestartPartitionsListener() { + zkClient.subscribeChildChanges(ZkUtils.RestartPartitionsPath, restartPartitionsListener) + } + private def deregisterReassignedPartitionsListener() = { zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener) } @@ -1395,6 +1461,61 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD } } +/** + * Initiates partitions restart for partitions on specified brokers + */ +class RestartPartitionsListener(controller: KafkaController) extends IZkChildListener with Logging { + this.logIdent = "[RestartPartitionsListener on " + controller.config.brokerId + "]: " + val controllerContext = controller.controllerContext + val zkClient = controllerContext.zkClient + + /** + * @throws Exception + * On any error. + */ + @throws(classOf[Exception]) + def handleDataDeleted(dataPath: String) { + } + + /** + * Invoked when a partition was marked for restart + * @throws Exception On any error. + */ + @throws(classOf[Exception]) + def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = { + import scala.collection.JavaConverters._ + val children = currentChilds.asScala + + trace("Got restart partition notification for children: " + children) + inLock(controllerContext.controllerLock) { + children.sorted.foreach { + childPath => + val fullChildPath = (parentPath + "/" + childPath).trim + + if (!controllerContext.partitionsBeingRestarted.contains(fullChildPath)) { + val (data, _) = ZkUtils.readData(zkClient, fullChildPath) + + debug("Handling restart partition for path %s with data %s".format(fullChildPath, data)) + val parsedPartitionsToBeRestarted = ZkUtils.parseRestartPartitions(data.toString).mapValues(Seq(_)) + val partitionsForTopicsToBeDeleted = parsedPartitionsToBeRestarted.keySet + .filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic)) + + if (partitionsForTopicsToBeDeleted.size > 0) { + debug("Skipping partitions restart for %s since the respective topics are being deleted" + .format(partitionsForTopicsToBeDeleted)) + } + val partitionsToBeRestarted = parsedPartitionsToBeRestarted.filterKeys(tap => !partitionsForTopicsToBeDeleted.contains(tap)) + debug("Partitions to be restarted on brokers: " + partitionsToBeRestarted) + controllerContext.partitionsBeingRestarted += (fullChildPath -> partitionsToBeRestarted) + } + } + controller.maybeTriggerPartitionRestart() + } + } + +} + + case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty, var isrChangeListener: ReassignedPartitionsIsrChangeListener = null) diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index bb6b5c8..c6221fb 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -200,6 +200,85 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) } /** + * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest) for the partition + * that needs to be restarted in some broker - restarting broker. It is assumed that restarting broker does NOT + * belong to any "alive" broker set: + * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live + * isr as the new isr. + * 2. Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException. + * 3. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr. + * 4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException + * Assigned replicas remain the same - it includes restarting broker (unless some broker went down) + * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache + */ +class RestartedPartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig, partitionsToBeRestarted: Map[TopicAndPartition, Seq[Int]]) + extends PartitionLeaderSelector with Logging { + this.logIdent = "[RestartedPartitionLeaderSelector]: " + + def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { + val restartingBrokers = partitionsToBeRestarted(topicAndPartition) + + controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { + case Some(assignedReplicas) => + def isBrokerAlive(brokerId: Int): Boolean = controllerContext.liveBrokerIds.contains(brokerId) + def isRestartingBroker(brokerId: Int): Boolean = restartingBrokers.contains(brokerId) + + val liveAssignedReplicas = assignedReplicas.filter(b => isBrokerAlive(b) && !isRestartingBroker(b)) + val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(b => isBrokerAlive(b) && !isRestartingBroker(b)) + val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch + val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion + + val newLeaderAndIsr = + + liveBrokersInIsr.isEmpty match { + case true => + // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration + // for unclean leader election. + if (!LogConfig.fromProps(config.originals(), AdminUtils.fetchTopicConfig(controllerContext.zkClient, + topicAndPartition.topic)).uncleanLeaderElectionEnable) { + throw new NoReplicaOnlineException(("No broker in ISR for partition " + + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + + " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) + } + + debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" + .format(topicAndPartition, liveAssignedReplicas.mkString(","))) + liveAssignedReplicas.isEmpty match { + case true => + throw new NoReplicaOnlineException(("No replica for partition " + + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + + " Assigned replicas are: [%s]".format(assignedReplicas)) + case false => + ControllerStats.uncleanLeaderElectionRate.mark() + val newLeader = liveAssignedReplicas.head + warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." + .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(","))) + new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) + } + case false => + val currentLeader = currentLeaderAndIsr.leader + val newLeader = if (isBrokerAlive(currentLeader) && !isRestartingBroker(currentLeader)) { + // current leader is alive and didn't initiate partition restart - should be fine to continue leading partition + currentLeader + } else { + // current leader is either dead or lost partition - new leader is elected as preferred replica + liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r)).head + } + debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." + .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) + new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) + } + + info("Selected new leader and ISR %s for restarting partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) + // to restart the partition on a particular broker it needs to stay in AR list + (newLeaderAndIsr, assignedReplicas.filter(isBrokerAlive)) + case None => + throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition)) + } + } +} + +/** * Essentially does nothing. Returns the current leader and ISR, and the current * set of replicas assigned to a given topic/partition. */ diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index e5e8007..7a46265 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -125,115 +125,125 @@ class Log(val dir: File, /** The name of this log */ def name = dir.getName() + private def kafkaStorageCheck[T](message: => String)(fun: => T): T = { + try { + fun + } catch { + case e: IOException => + throw new GenericKafkaStorageException(this.dir, message, e) + } + } + /* Load the log segments from the log files on disk */ private def loadSegments() { - // create the log directory if it doesn't exist - dir.mkdirs() - var swapFiles = Set[File]() - - // first do a pass through the files in the log directory and remove any temporary files - // and find any interrupted swap operations - for(file <- dir.listFiles if file.isFile) { - if(!file.canRead) - throw new IOException("Could not read file " + file) - val filename = file.getName - if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) { - // if the file ends in .deleted or .cleaned, delete it - file.delete() - } else if(filename.endsWith(SwapFileSuffix)) { - // we crashed in the middle of a swap operation, to recover: - // if a log, delete the .index file, complete the swap operation later - // if an index just delete it, it will be rebuilt - val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) - if(baseName.getPath.endsWith(IndexFileSuffix)) { + kafkaStorageCheck("Failed to load segments from the disk") { + // create the log directory if it doesn't exist + dir.mkdirs() + var swapFiles = Set[File]() + + // first do a pass through the files in the log directory and remove any temporary files + // and find any interrupted swap operations + for (file <- dir.listFiles if file.isFile) { + if (!file.canRead) + throw new IOException("Could not read file " + file) + val filename = file.getName + if (filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) { + // if the file ends in .deleted or .cleaned, delete it file.delete() - } else if(baseName.getPath.endsWith(LogFileSuffix)){ - // delete the index - val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix)) - index.delete() - swapFiles += file + } else if (filename.endsWith(SwapFileSuffix)) { + // we crashed in the middle of a swap operation, to recover: + // if a log, delete the .index file, complete the swap operation later + // if an index just delete it, it will be rebuilt + val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) + if (baseName.getPath.endsWith(IndexFileSuffix)) { + file.delete() + } else if (baseName.getPath.endsWith(LogFileSuffix)) { + // delete the index + val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix)) + index.delete() + swapFiles += file + } } } - } - // now do a second pass and load all the .log and .index files - for(file <- dir.listFiles if file.isFile) { - val filename = file.getName - if(filename.endsWith(IndexFileSuffix)) { - // if it is an index file, make sure it has a corresponding .log file - val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix)) - if(!logFile.exists) { - warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) - file.delete() - } - } else if(filename.endsWith(LogFileSuffix)) { - // if its a log file, load the corresponding log segment - val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong - val indexFile = Log.indexFilename(dir, start) - val segment = new LogSegment(dir = dir, - startOffset = start, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - rollJitterMs = config.randomSegmentJitter, - time = time, - fileAlreadyExists = true) - - if(indexFile.exists()) { - try { + // now do a second pass and load all the .log and .index files + for (file <- dir.listFiles if file.isFile) { + val filename = file.getName + if (filename.endsWith(IndexFileSuffix)) { + // if it is an index file, make sure it has a corresponding .log file + val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix)) + if (!logFile.exists) { + warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) + file.delete() + } + } else if (filename.endsWith(LogFileSuffix)) { + // if its a log file, load the corresponding log segment + val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong + val indexFile = Log.indexFilename(dir, start) + val segment = new LogSegment(dir = dir, + startOffset = start, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, + time = time, + fileAlreadyExists = true) + + if (indexFile.exists()) { + try { segment.index.sanityCheck() - } catch { - case e: java.lang.IllegalArgumentException => - warn("Found an corrupted index file, %s, deleting and rebuilding index...".format(indexFile.getAbsolutePath)) - indexFile.delete() - segment.recover(config.maxMessageSize) + } catch { + case e: java.lang.IllegalArgumentException => + warn("Found an corrupted index file, %s, deleting and rebuilding index...".format(indexFile.getAbsolutePath)) + indexFile.delete() + segment.recover(config.maxMessageSize) + } } + else { + error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) + segment.recover(config.maxMessageSize) + } + segments.put(start, segment) } - else { - error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) - segment.recover(config.maxMessageSize) - } - segments.put(start, segment) } - } - - // Finally, complete any interrupted swap operations. To be crash-safe, - // log files that are replaced by the swap segment should be renamed to .deleted - // before the swap file is restored as the new segment file. - for (swapFile <- swapFiles) { - val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) - val fileName = logFile.getName - val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong - val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix) - val index = new OffsetIndex(file = indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) - val swapSegment = new LogSegment(new FileMessageSet(file = swapFile), - index = index, - baseOffset = startOffset, - indexIntervalBytes = config.indexInterval, - rollJitterMs = config.randomSegmentJitter, - time = time) - info("Found log file %s from interrupted swap operation, repairing.".format(swapFile.getPath)) - swapSegment.recover(config.maxMessageSize) - val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset) - replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true) - } - if(logSegments.size == 0) { - // no existing segments, create a new mutable segment beginning at offset 0 - segments.put(0L, new LogSegment(dir = dir, - startOffset = 0, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - rollJitterMs = config.randomSegmentJitter, - time = time, - fileAlreadyExists = false, - initFileSize = this.initFileSize(), - preallocate = config.preallocate)) - } else { - recoverLog() - // reset the index size of the currently active log segment to allow more entries - activeSegment.index.resize(config.maxIndexSize) - } + // Finally, complete any interrupted swap operations. To be crash-safe, + // log files that are replaced by the swap segment should be renamed to .deleted + // before the swap file is restored as the new segment file. + for (swapFile <- swapFiles) { + val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) + val fileName = logFile.getName + val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong + val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix) + val index = new OffsetIndex(file = indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) + val swapSegment = new LogSegment(new FileMessageSet(file = swapFile), + index = index, + baseOffset = startOffset, + indexIntervalBytes = config.indexInterval, + rollJitterMs = config.randomSegmentJitter, + time = time) + info("Found log file %s from interrupted swap operation, repairing.".format(swapFile.getPath)) + swapSegment.recover(config.maxMessageSize) + val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset) + replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true) + } + if (logSegments.size == 0) { + // no existing segments, create a new mutable segment beginning at offset 0 + segments.put(0L, new LogSegment(dir = dir, + startOffset = 0, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, + time = time, + fileAlreadyExists = false, + initFileSize = this.initFileSize(), + preallocate = config.preallocate)) + } else { + recoverLog() + // reset the index size of the currently active log segment to allow more entries + activeSegment.index.resize(config.maxIndexSize) + } + } } private def updateLogEndOffset(messageOffset: Long) { @@ -301,7 +311,7 @@ class Log(val dir: File, * @param messages The message set to append * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given * - * @throws KafkaStorageException If the append fails due to an I/O error. + * @throws GenericKafkaStorageException If the append fails due to an I/O error. * * @return Information about the appended messages including the first and last offset. */ @@ -315,18 +325,18 @@ class Log(val dir: File, // trim any invalid bytes or partial messages before appending it to the on-disk log var validMessages = trimInvalidBytes(messages, appendInfo) - try { - // they are valid, insert them in the log - lock synchronized { + // they are valid, insert them in the log + lock synchronized { + kafkaStorageCheck("I/O exception in append to log '%s'".format(name)) { appendInfo.firstOffset = nextOffsetMetadata.messageOffset if(assignOffsets) { // assign offsets to the message set val offset = new AtomicLong(nextOffsetMetadata.messageOffset) - try { - validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact) - } catch { - case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) + kafkaStorageCheck("Error in validating messages while appending to log '%s'".format(name)) { + validMessages = + validMessages.validateMessagesAndAssignOffsets( + offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact) } appendInfo.lastOffset = offset.get - 1 } else { @@ -371,8 +381,6 @@ class Log(val dir: File, appendInfo } - } catch { - case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) } } @@ -595,40 +603,42 @@ class Log(val dir: File, def roll(): LogSegment = { val start = time.nanoseconds lock synchronized { - val newOffset = logEndOffset - val logFile = logFilename(dir, newOffset) - val indexFile = indexFilename(dir, newOffset) - for(file <- List(logFile, indexFile); if file.exists) { - warn("Newly rolled segment file " + file.getName + " already exists; deleting it first") - file.delete() - } - - segments.lastEntry() match { - case null => - case entry => { - entry.getValue.index.trimToValidSize() - entry.getValue.log.trim() + kafkaStorageCheck("Failed to roll the log") { + val newOffset = logEndOffset + val logFile = logFilename(dir, newOffset) + val indexFile = indexFilename(dir, newOffset) + for (file <- List(logFile, indexFile); if file.exists) { + warn("Newly rolled segment file " + file.getName + " already exists; deleting it first") + file.delete() } + + segments.lastEntry() match { + case null => + case entry => { + entry.getValue.index.trimToValidSize() + entry.getValue.log.trim() + } + } + val segment = new LogSegment(dir, + startOffset = newOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, + time = time, + fileAlreadyExists = false, + initFileSize = initFileSize, + preallocate = config.preallocate) + val prev = addSegment(segment) + if (prev != null) + throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) + + // schedule an asynchronous flush of the old segment + scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) + + info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0 * 1000.0))) + + segment } - val segment = new LogSegment(dir, - startOffset = newOffset, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - rollJitterMs = config.randomSegmentJitter, - time = time, - fileAlreadyExists = false, - initFileSize = initFileSize, - preallocate = config.preallocate) - val prev = addSegment(segment) - if(prev != null) - throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) - - // schedule an asynchronous flush of the old segment - scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) - - info("Rolled new log segment for '" + name + "' in %.0f ms.".format((System.nanoTime - start) / (1000.0*1000.0))) - - segment } } @@ -647,12 +657,14 @@ class Log(val dir: File, * @param offset The offset to flush up to (non-inclusive); the new recovery point */ def flush(offset: Long) : Unit = { - if (offset <= this.recoveryPoint) + if(offset <= this.recoveryPoint) return debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " + time.milliseconds + " unflushed = " + unflushedMessages) - for(segment <- logSegments(this.recoveryPoint, offset)) - segment.flush() + kafkaStorageCheck("Failed to flush the log") { + for (segment <- logSegments(this.recoveryPoint, offset)) + segment.flush() + } lock synchronized { if(offset > this.recoveryPoint) { this.recoveryPoint = offset @@ -666,10 +678,12 @@ class Log(val dir: File, */ private[log] def delete() { lock synchronized { - removeLogMetrics() - logSegments.foreach(_.delete()) - segments.clear() - CoreUtils.rm(dir) + kafkaStorageCheck("Failed to delete log") { + removeLogMetrics() + logSegments.foreach(_.delete()) + segments.clear() + CoreUtils.rm(dir) + } } } @@ -686,14 +700,16 @@ class Log(val dir: File, return } lock synchronized { - if(segments.firstEntry.getValue.baseOffset > targetOffset) { - truncateFullyAndStartAt(targetOffset) - } else { - val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) - deletable.foreach(deleteSegment(_)) - activeSegment.truncateTo(targetOffset) - updateLogEndOffset(targetOffset) - this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) + kafkaStorageCheck("Failed to truncate log") { + if (segments.firstEntry.getValue.baseOffset > targetOffset) { + truncateFullyAndStartAt(targetOffset) + } else { + val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) + deletable.foreach(deleteSegment(_)) + activeSegment.truncateTo(targetOffset) + updateLogEndOffset(targetOffset) + this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) + } } } } @@ -705,19 +721,21 @@ class Log(val dir: File, private[log] def truncateFullyAndStartAt(newOffset: Long) { debug("Truncate and start log '" + name + "' to " + newOffset) lock synchronized { - val segmentsToDelete = logSegments.toList - segmentsToDelete.foreach(deleteSegment(_)) - addSegment(new LogSegment(dir, - newOffset, - indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize, - rollJitterMs = config.randomSegmentJitter, - time = time, - fileAlreadyExists = false, - initFileSize = initFileSize, - preallocate = config.preallocate)) - updateLogEndOffset(newOffset) - this.recoveryPoint = math.min(newOffset, this.recoveryPoint) + kafkaStorageCheck("Failed to fully truncate log") { + val segmentsToDelete = logSegments.toList + segmentsToDelete.foreach(deleteSegment(_)) + addSegment(new LogSegment(dir, + newOffset, + indexIntervalBytes = config.indexInterval, + maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, + time = time, + fileAlreadyExists = false, + initFileSize = initFileSize, + preallocate = config.preallocate)) + updateLogEndOffset(newOffset) + this.recoveryPoint = math.min(newOffset, this.recoveryPoint) + } } } @@ -815,22 +833,25 @@ class Log(val dir: File, */ private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], isRecoveredSwapFile : Boolean = false) { lock synchronized { - // need to do this in two phases to be crash safe AND do the delete asynchronously - // if we crash in the middle of this we complete the swap in loadSegments() - if (!isRecoveredSwapFile) - newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix) - addSegment(newSegment) - - // delete the old files - for(seg <- oldSegments) { - // remove the index entry - if(seg.baseOffset != newSegment.baseOffset) - segments.remove(seg.baseOffset) - // delete segment - asyncDeleteSegment(seg) + kafkaStorageCheck("Failed to replace old segments") { + // need to do this in two phases to be crash safe AND do the delete asynchronously + // if we crash in the middle of this we complete the swap in loadSegments() + if (!isRecoveredSwapFile) + newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix) + addSegment(newSegment) + + // delete the old files + for (seg <- oldSegments) { + // remove the index entry + if (seg.baseOffset != newSegment.baseOffset) + segments.remove(seg.baseOffset) + // delete segment + asyncDeleteSegment(seg) + } + + // okay we are safe now, remove the swap suffix + newSegment.changeFileSuffixes(Log.SwapFileSuffix, "") } - // okay we are safe now, remove the swap suffix - newSegment.changeFileSuffixes(Log.SwapFileSuffix, "") } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 69386c1..2387591 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -21,22 +21,22 @@ import java.io._ import java.util.concurrent.TimeUnit import kafka.utils._ import scala.collection._ -import kafka.common.{TopicAndPartition, KafkaException} +import kafka.common.{GenericKafkaStorageException, TopicAndPartition, KafkaException} import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint} import java.util.concurrent.{Executors, ExecutorService, ExecutionException, Future} /** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. * All read and write operations are delegated to the individual log instances. - * + * * The log manager maintains logs in one or more directories. New logs are created in the data directory * with the fewest logs. No attempt is made to move partitions after the fact or balance based on * size or I/O rate. - * + * * A background thread handles log retention by periodically truncating excess log segments. */ @threadsafe -class LogManager(val logDirs: Array[File], +class LogManager(@volatile private var logDirs: Array[File], val topicConfigs: Map[String, LogConfig], val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig, @@ -47,6 +47,9 @@ class LogManager(val logDirs: Array[File], scheduler: Scheduler, val brokerState: BrokerState, private val time: Time) extends Logging { + + def getLogDirs = this.logDirs + val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" val LockFile = ".lock" val InitialTaskDelayMs = 30*1000 @@ -55,7 +58,21 @@ class LogManager(val logDirs: Array[File], createAndValidateLogDirs(logDirs) private val dirLocks = lockLogDirs(logDirs) - private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap + @volatile private var recoveryPointCheckpoints = createRecoveryPointCheckpoints() + + private def createRecoveryPointCheckpoints(): Map[File, OffsetCheckpoint] = { + logDirs.map( + dir => + try { + (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile))) + } catch { + case e: IOException => + throw new GenericKafkaStorageException(dir, "Failed to create recovery point checkpoints at " + dir, e) + } + + ).toMap + } + loadLogs() // public, so we can access this from kafka.admin.DeleteTopicTest @@ -64,13 +81,13 @@ class LogManager(val logDirs: Array[File], new LogCleaner(cleanerConfig, logDirs, logs, time = time) else null - + /** * Create and check validity of the given directories, specifically: *
    *
  1. Ensure that there are no duplicates in the directory list *
  2. Create each directory if it doesn't exist - *
  3. Check that each path is a readable directory + *
  4. Check that each path is a readable directory *
*/ private def createAndValidateLogDirs(dirs: Seq[File]) { @@ -87,7 +104,7 @@ class LogManager(val logDirs: Array[File], throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.") } } - + /** * Lock all the given directories */ @@ -95,12 +112,12 @@ class LogManager(val logDirs: Array[File], dirs.map { dir => val lock = new FileLock(new File(dir, LockFile)) if(!lock.tryLock()) - throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath + + throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath + ". A Kafka instance in another process or thread is using this directory.") lock } } - + /** * Recover and load all logs in the given data directories */ @@ -130,6 +147,9 @@ class LogManager(val logDirs: Array[File], try { recoveryPoints = this.recoveryPointCheckpoints(dir).read } catch { + case e: IOException => + throw new GenericKafkaStorageException( + dir, "Failed to read recovery point checkpoints from the directory " + dir, e) case e: Exception => { warn("Error occured while reading recovery-point-offset-checkpoint file of directory " + dir, e) warn("Resetting the recovery checkpoint to 0") @@ -186,16 +206,16 @@ class LogManager(val logDirs: Array[File], /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs)) - scheduler.schedule("kafka-log-retention", - cleanupLogs, - delay = InitialTaskDelayMs, - period = retentionCheckMs, + scheduler.schedule("kafka-log-retention", + cleanupLogs, + delay = InitialTaskDelayMs, + period = retentionCheckMs, TimeUnit.MILLISECONDS) info("Starting log flusher with a default period of %d ms.".format(flushCheckMs)) - scheduler.schedule("kafka-log-flusher", - flushDirtyLogs, - delay = InitialTaskDelayMs, - period = flushCheckMs, + scheduler.schedule("kafka-log-flusher", + flushDirtyLogs, + delay = InitialTaskDelayMs, + period = flushCheckMs, TimeUnit.MILLISECONDS) scheduler.schedule("kafka-recovery-point-checkpoint", checkpointRecoveryPointOffsets, @@ -310,11 +330,13 @@ class LogManager(val logDirs: Array[File], } /** - * Write out the current recovery point for all logs to a text file in the log directory + * Write out the current recovery point for all logs to a text file in the log directory * to avoid recovering the whole log on startup. */ def checkpointRecoveryPointOffsets() { - this.logDirs.foreach(checkpointLogsInDir) + logCreationOrDeletionLock synchronized { + this.logDirs.foreach(checkpointLogsInDir) + } } /** @@ -323,7 +345,12 @@ class LogManager(val logDirs: Array[File], private def checkpointLogsInDir(dir: File): Unit = { val recoveryPoints = this.logsByDir.get(dir.toString) if (recoveryPoints.isDefined) { - this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) + try { + this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint)) + } catch { + case e: IOException => + throw new GenericKafkaStorageException(dir, "Failed to store recovery points", e) + } } } @@ -345,24 +372,24 @@ class LogManager(val logDirs: Array[File], def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = { logCreationOrDeletionLock synchronized { var log = logs.get(topicAndPartition) - + // check if the log has already been created in another thread if(log != null) return log - + // if not, create it val dataDir = nextLogDir() val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition) dir.mkdirs() - log = new Log(dir, + log = new Log(dir, config, recoveryPoint = 0L, scheduler, time) logs.put(topicAndPartition, log) info("Created log for partition [%s,%d] in %s with properties {%s}." - .format(topicAndPartition.topic, - topicAndPartition.partition, + .format(topicAndPartition.topic, + topicAndPartition.partition, dataDir.getAbsolutePath, {import JavaConversions._; config.originals.mkString(", ")})) log @@ -392,6 +419,52 @@ class LogManager(val logDirs: Array[File], } /** + * Remove disk from the pool and shutdown all operations related to the + * directory and underlying logs + * + * @param disk logs directory to be be put to offline + */ + def offlineDisk(disk: File): Unit ={ + debug("Putting disk " + disk.getAbsolutePath + " to offline") + logCreationOrDeletionLock synchronized { + + recoveryPointCheckpoints = recoveryPointCheckpoints.filterNot { + case (file, _) => file.getParentFile == disk + } + + for ((tap, _) <- logsByDir(disk.toString)) { + offlineLog(tap) + } + + logDirs = logDirs.filterNot(_ == disk) + } + } + + /** + * Offline log but don't delete physical entries + */ + def offlineLog(topicAndPartition: TopicAndPartition) { + var removedLog: Log = null + logCreationOrDeletionLock synchronized { + + removedLog = logs.remove(topicAndPartition) + + if (removedLog != null) { + //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. + if (cleaner != null) { + cleaner.abortAndPauseCleaning(topicAndPartition) + } + } + CoreUtils.swallowTrace(removedLog.delete()) + } + debug("Offlined log for partition [%s,%d] in %s." + .format(topicAndPartition.topic, + topicAndPartition.partition, + removedLog.dir.getAbsolutePath)) + } + + + /** * Choose the next directory in which to create a log. Currently this is done * by calculating the number of partitions in each directory and then choosing the * data directory with the fewest partitions. @@ -404,7 +477,7 @@ class LogManager(val logDirs: Array[File], val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size) val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap var dirCounts = (zeros ++ logCounts).toBuffer - + // choose the directory with the least logs in it val leastLoaded = dirCounts.sortBy(_._2).head new File(leastLoaded._1) @@ -468,7 +541,7 @@ class LogManager(val logDirs: Array[File], /** * Map of log dir to logs by topic and partitions in that dir */ - private def logsByDir = { + def logsByDir = { this.logsByTopicPartition.groupBy { case (_, log) => log.dir.getParent } @@ -480,16 +553,18 @@ class LogManager(val logDirs: Array[File], private def flushDirtyLogs() = { debug("Checking for dirty logs to flush...") - for ((topicAndPartition, log) <- logs) { - try { - val timeSinceLastFlush = time.milliseconds - log.lastFlushTime - debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + log.config.flushMs + - " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush) - if(timeSinceLastFlush >= log.config.flushMs) - log.flush - } catch { - case e: Throwable => - error("Error flushing topic " + topicAndPartition.topic, e) + logCreationOrDeletionLock synchronized { + for ((topicAndPartition, log) <- logs) { + try { + val timeSinceLastFlush = time.milliseconds - log.lastFlushTime + debug("Checking if flush is needed on " + topicAndPartition.topic + " flush interval " + log.config.flushMs + + " last flushed " + log.lastFlushTime + " time since last flush: " + timeSinceLastFlush) + if (timeSinceLastFlush >= log.config.flushMs) + log.flush + } catch { + case e: Throwable => + error("Error flushing topic " + topicAndPartition.topic, e) + } } } } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 1377e8f..2afe640 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -16,13 +16,13 @@ */ package kafka.log +import java.io.{File, IOException} + import kafka.message._ -import kafka.common._ +import kafka.server.{FetchDataInfo, LogOffsetMetadata} import kafka.utils._ -import kafka.server.{LogOffsetMetadata, FetchDataInfo} import scala.math._ -import java.io.File /** @@ -256,10 +256,10 @@ class LogSegment(val log: FileMessageSet, def changeFileSuffixes(oldSuffix: String, newSuffix: String) { val logRenamed = log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) if(!logRenamed) - throw new KafkaStorageException("Failed to change the log file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) + throw new IOException("Failed to change the log file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) val indexRenamed = index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) if(!indexRenamed) - throw new KafkaStorageException("Failed to change the index file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) + throw new IOException("Failed to change the index file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) } /** @@ -272,15 +272,15 @@ class LogSegment(val log: FileMessageSet, /** * Delete this log segment from the filesystem. - * @throws KafkaStorageException if the delete fails. + * @throws IOException if the delete fails. */ def delete() { val deletedLog = log.delete() val deletedIndex = index.delete() if(!deletedLog && log.file.exists) - throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.") + throw new IOException("Delete of log " + log.file.getName + " failed.") if(!deletedIndex && index.file.exists) - throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.") + throw new IOException("Delete of index " + index.file.getName + " failed.") } /** diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 18f5b5b..fc80b46 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -113,9 +113,8 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, leaderAndIsrResponse))) } catch { - case e: KafkaStorageException => - fatal("Disk error during leadership change.", e) - Runtime.getRuntime.halt(1) + case e: GenericKafkaStorageException => + replicaManager.exceptionHandler(e) } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index dbe170f..da9f86e 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -26,10 +26,11 @@ import kafka.consumer.ConsumerConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.CoreUtils import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef} +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} import org.apache.kafka.common.metrics.MetricsReporter import org.apache.kafka.common.protocol.SecurityProtocol -import scala.collection.{mutable, immutable, JavaConversions, Map} + +import scala.collection.{JavaConversions, Map, immutable} object Defaults { /** ********* Zookeeper Configuration ***********/ @@ -398,10 +399,10 @@ object KafkaConfig { private val configDef = { + import ConfigDef.Importance._ import ConfigDef.Range._ - import ConfigDef.ValidString._ import ConfigDef.Type._ - import ConfigDef.Importance._ + import ConfigDef.ValidString._ new ConfigDef() @@ -586,7 +587,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka /** ********* Log Configuration ***********/ val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp) val numPartitions = getInt(KafkaConfig.NumPartitionsProp) - val logDirs = CoreUtils.parseCsvList( Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp))) + @volatile var logDirs = CoreUtils.parseCsvList( Option(getString(KafkaConfig.LogDirsProp)).getOrElse(getString(KafkaConfig.LogDirProp))) val logSegmentBytes = getInt(KafkaConfig.LogSegmentBytesProp) val logFlushIntervalMessages = getLong(KafkaConfig.LogFlushIntervalMessagesProp) val logCleanerThreads = getInt(KafkaConfig.LogCleanerThreadsProp) diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index 8c5b054..e1d298d 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -17,7 +17,7 @@ package kafka.server import scala.collection._ -import kafka.utils.Logging +import kafka.utils.{CoreUtils, Logging} import kafka.common._ import java.io._ @@ -34,9 +34,10 @@ class OffsetCheckpoint(val file: File) extends Logging { // write to temp file and then swap with the existing file val temp = new File(file.getAbsolutePath + ".tmp") - val fileOutputStream = new FileOutputStream(temp) - val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream)) + var writer: BufferedWriter = null try { + val fileOutputStream = new FileOutputStream(temp) + writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream)) // write the current version writer.write(0.toString) writer.newLine() @@ -54,24 +55,29 @@ class OffsetCheckpoint(val file: File) extends Logging { // flush the buffer and then fsync the underlying file writer.flush() fileOutputStream.getFD().sync() + + // swap new offset checkpoint file with previous one + if(!temp.renameTo(file)) { + // renameTo() fails on Windows if the destination file exists. + file.delete() + if(!temp.renameTo(file)) + throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath)) + } } finally { - writer.close() + if (writer != null) + CoreUtils.swallow(writer.close()) + } - // swap new offset checkpoint file with previous one - if(!temp.renameTo(file)) { - // renameTo() fails on Windows if the destination file exists. - file.delete() - if(!temp.renameTo(file)) - throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath)) - } + } } def read(): Map[TopicAndPartition, Long] = { lock synchronized { - val reader = new BufferedReader(new FileReader(file)) + var reader: BufferedReader = null try { + reader = new BufferedReader(new FileReader(file)) var line = reader.readLine() if(line == null) return Map.empty @@ -87,7 +93,7 @@ class OffsetCheckpoint(val file: File) extends Logging { while(line != null) { val pieces = line.split("\\s+") if(pieces.length != 3) - throw new IOException("Malformed line in offset checkpoint file: '%s'.".format(line)) + throw new IllegalStateException("Malformed line in offset checkpoint file: '%s'.".format(line)) val topic = pieces(0) val partition = pieces(1).toInt @@ -96,13 +102,14 @@ class OffsetCheckpoint(val file: File) extends Logging { line = reader.readLine() } if(offsets.size != expectedSize) - throw new IOException("Expected %d entries but found only %d".format(expectedSize, offsets.size)) + throw new IllegalStateException("Expected %d entries but found only %d".format(expectedSize, offsets.size)) offsets case _ => - throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version) + throw new IllegalStateException("Unrecognized version of the highwatermark checkpoint file: " + version) } } finally { - reader.close() + if(reader != null) + CoreUtils.swallow(reader.close()) } } } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index c89d00b..7df0048 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -22,7 +22,7 @@ import kafka.cluster.BrokerEndPoint import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet import kafka.api.{OffsetRequest, FetchResponsePartitionData} -import kafka.common.{KafkaStorageException, TopicAndPartition} +import kafka.common.{GenericKafkaStorageException, KafkaStorageException, TopicAndPartition} class ReplicaFetcherThread(name:String, sourceBroker: BrokerEndPoint, @@ -63,9 +63,8 @@ class ReplicaFetcherThread(name:String, trace("Follower %d set replica high watermark for partition [%s,%d] to %s" .format(replica.brokerId, topic, partitionId, followerHighWatermark)) } catch { - case e: KafkaStorageException => - fatal("Disk error while replicating data.", e) - Runtime.getRuntime.halt(1) + case e: GenericKafkaStorageException => + replicaMgr.exceptionHandler(e) } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 795220e..1e25e45 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -16,30 +16,24 @@ */ package kafka.server -import kafka.api._ -import kafka.common._ -import kafka.utils._ +import java.io.{File, IOException} +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +import com.yammer.metrics.core.Gauge +import kafka.api.{PartitionFetchInfo, ProducerResponseStatus, _} import kafka.cluster.{BrokerEndPoint, Partition, Replica} -import kafka.log.{LogAppendInfo, LogManager} -import kafka.metrics.KafkaMetricsGroup +import kafka.common.{TopicAndPartition, _} import kafka.controller.KafkaController +import kafka.log.{LogAppendInfo, LogManager} import kafka.message.{ByteBufferMessageSet, MessageSet} -import kafka.api.ProducerResponseStatus -import kafka.common.TopicAndPartition -import kafka.api.PartitionFetchInfo - +import kafka.metrics.KafkaMetricsGroup +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.protocol.Errors -import java.util.concurrent.atomic.AtomicBoolean -import java.io.{IOException, File} -import java.util.concurrent.TimeUnit - -import scala.Some import scala.collection._ -import org.I0Itec.zkclient.ZkClient -import com.yammer.metrics.core.Gauge - /* * Result metadata of a log append operation on the log */ @@ -112,7 +106,7 @@ class ReplicaManager(val config: KafkaConfig, private val replicaStateChangeLock = new Object val replicaFetcherManager = new ReplicaFetcherManager(config, this) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) - val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap + var highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap private var hwThreadInitialized = false this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: " val stateChangeLogger = KafkaController.stateChangeLogger @@ -122,11 +116,26 @@ class ReplicaManager(val config: KafkaConfig, val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests) + private def createHighWatermarkCheckpoints(): Map[String, OffsetCheckpoint] = { + config.logDirs.map( + dir => + try { + (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename))) + } catch { + case e: IOException => + throw new GenericKafkaStorageException(new File(dir), "Failed to create highwatermark chekpoint file in directory " + + dir, e) + } + ).toMap + } + + val exceptionHandler = new ExceptionHandler + newGauge( "LeaderCount", new Gauge[Int] { def value = { - getLeaderPartitions().size + getLeaderPartitions().size } } ) @@ -142,11 +151,11 @@ class ReplicaManager(val config: KafkaConfig, def value = underReplicatedPartitionCount() } ) - val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS) - val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) + val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS) + val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) def underReplicatedPartitionCount(): Int = { - getLeaderPartitions().count(_.isUnderReplicated) + getLeaderPartitions().count(_.isUnderReplicated) } def startHighWaterMarksCheckPointThread() = { @@ -183,7 +192,7 @@ class ReplicaManager(val config: KafkaConfig, scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS) } - def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = { + def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = { stateChangeLogger.trace("Broker %d handling stop replica (delete=%s) for partition [%s,%d]".format(localBrokerId, deletePartition.toString, topic, partitionId)) val errorCode = ErrorMapping.NoError @@ -201,7 +210,7 @@ class ReplicaManager(val config: KafkaConfig, val topicAndPartition = TopicAndPartition(topic, partitionId) if(logManager.getLog(topicAndPartition).isDefined) { - logManager.deleteLog(topicAndPartition) + logManager.deleteLog(topicAndPartition) } } stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker" @@ -258,7 +267,7 @@ class ReplicaManager(val config: KafkaConfig, throw new ReplicaNotAvailableException("Replica %d is not available for partition [%s,%d]".format(config.brokerId, topic, partition)) } - def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica = { + def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica = { val partitionOpt = getPartition(topic, partitionId) partitionOpt match { case None => @@ -273,7 +282,7 @@ class ReplicaManager(val config: KafkaConfig, } } - def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = { + def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = { val partitionOpt = getPartition(topic, partitionId) partitionOpt match { case None => None @@ -396,10 +405,12 @@ class ReplicaManager(val config: KafkaConfig, } catch { // NOTE: Failed produce requests metric is not incremented for known exceptions // it is supposed to indicate un-expected failures of a broker in handling a produce request - case e: KafkaStorageException => - fatal("Halting due to unrecoverable I/O error while handling produce request: ", e) - Runtime.getRuntime.halt(1) - (topicAndPartition, null) + case e: GenericKafkaStorageException => + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() + error("Error processing append operation on partition %s".format(topicAndPartition), e) + exceptionHandler(e) + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) case utpe: UnknownTopicOrPartitionException => (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe))) case nle: NotLeaderForPartitionException => @@ -408,7 +419,7 @@ class ReplicaManager(val config: KafkaConfig, (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtle))) case mstle: MessageSetSizeTooLargeException => (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle))) - case imse : InvalidMessageSizeException => + case imse: InvalidMessageSizeException => (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse))) case t: Throwable => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() @@ -432,7 +443,7 @@ class ReplicaManager(val config: KafkaConfig, val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId - val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) + val fetchOnlyCommitted: Boolean = !Request.isValidBrokerId(replicaId) // read from local logs val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo) @@ -527,7 +538,7 @@ class ReplicaManager(val config: KafkaConfig, LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(nle)) case rnae: ReplicaNotAvailableException => LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(rnae)) - case oor : OffsetOutOfRangeException => + case oor: OffsetOutOfRangeException => LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(oor)) case e: Throwable => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() @@ -832,11 +843,10 @@ class ReplicaManager(val config: KafkaConfig, for((dir, reps) <- replicasByDir) { val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap try { - highWatermarkCheckpoints(dir).write(hwms) + highWatermarkCheckpoints.get(dir).foreach(_.write(hwms)) } catch { case e: IOException => - fatal("Error writing to highwatermark file: ", e) - Runtime.getRuntime().halt(1) + exceptionHandler(new GenericKafkaStorageException(new File(dir), "Failed to store highwatermark in dir " + dir, e)) } } } @@ -851,4 +861,79 @@ class ReplicaManager(val config: KafkaConfig, checkpointHighWatermarks() info("Shut down completely") } + + + class ExceptionHandler extends (Throwable => Unit) with Logging { + this.logIdent = "[ExceptionHandler on Broker " + localBrokerId + "]: " + + override def apply(ex: Throwable): Unit = { + debug("Received exception to handle: " + ex) + replicaStateChangeLock synchronized { + ex match { + case GenericKafkaStorageException(unavailableFile, _, _) => + // define - it's a log or a disk + if (logManager.getLogDirs.contains(unavailableFile)) { + offlineDisk(unavailableFile) + } else { + val logOption = logManager.allLogs().find(l => l.dir == unavailableFile) + logOption match { + case Some(log) => + offlineDisk(log.dir.getParentFile) + case None => + warn("File " + unavailableFile + " is neither disk nor log directory") + } + } + } + } + } + + /** + * To offline the disk: + * 1. Stop updating highWatermark checkpoints + * 2. Update config.logDirs so new partitions are not created there + * 3. Request LogManager to offline the disk - stop all IO operations on it and remove disk and underlying + * partitions from the pool of managed directories and logs + * 4. Restart partitions that were stored under unavailable disk - notify controller through a zk path + */ + private def offlineDisk(disk: File): Unit = { + warn("Entire directory " + disk + " is not available. Putting directory to offline, restarting underlying partitions.") + + removeOfflineHighWaterMarkCheckPoints(Seq(disk.getAbsolutePath)) + + val affectedPartitions = logManager.logsByDir(disk.getAbsolutePath).keySet + debug("Affected partitions: " + affectedPartitions + " will be restarted") + + config.logDirs = logManager.getLogDirs.filterNot(_ == disk).map(_.getAbsolutePath) + trace("Config log dirs was changed to " + config.logDirs) + + logManager.offlineDisk(disk) + replicaFetcherManager.removeFetcherForPartitions(affectedPartitions) + for (topicAndPartition <- affectedPartitions) { + stopReplica(topicAndPartition.topic, topicAndPartition.partition, deletePartition = true) + } + + if (logManager.getLogDirs.isEmpty) { + fatal("The last available disk was put to offline") + Runtime.getRuntime.halt(1) + } + + requestPartitionsRestart(affectedPartitions) + } + + private def requestPartitionsRestart(taps: Set[TopicAndPartition]): Unit = { + trace("Requesting partitions restart for " + taps) + + val partitions = taps.map(tap => Map("topic" -> tap.topic, "partition" -> tap.partition)) + val data = Json.encode(Map("version" -> 1, "broker" -> localBrokerId, "partitions" -> partitions)) + ZkUtils.createSequentialPersistentPath(zkClient, ZkUtils.RestartPartitionsPath + "/" + ZkUtils.RestartPartitionsZnodePrefix, data) + } + + private def removeOfflineHighWaterMarkCheckPoints(dirs: Seq[String]): Unit = { + trace("Removing highWatermark checkpoints for offline directories: " + dirs) + replicaStateChangeLock synchronized { + highWatermarkCheckpoints = highWatermarkCheckpoints.filterNot { case (dir, _) => dirs.contains(dir)} + } + } + } + } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 166814c..2134a43 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -48,6 +48,9 @@ object ZkUtils extends Logging { val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" val BrokerSequenceIdPath = "/brokers/seqid" val IsrChangeNotificationPath = "/isr_change_notification" + val RestartPartitionsPath = "/restart_partitions" + val RestartPartitionsZnodePrefix = "restart_request" + def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic @@ -94,7 +97,7 @@ object ZkUtils extends Logging { def setupCommonPaths(zkClient: ZkClient) { for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, - DeleteTopicsPath, BrokerSequenceIdPath)) + DeleteTopicsPath, BrokerSequenceIdPath, RestartPartitionsPath)) makeSurePersistentPathExists(zkClient, path) } @@ -676,6 +679,42 @@ object ZkUtils extends Logging { } } + def getPartitionsBeingRestarted(zkClient: ZkClient): Map[String, Map[TopicAndPartition, Seq[Int]]] = { + import scala.collection.JavaConverters._ + val restartPaths = zkClient.getChildren(RestartPartitionsPath).asScala + restartPaths.map { + restartChildPath => + val fullChildPath = (RestartPartitionsPath + "/" + restartChildPath).trim + val jsonOpt = readDataMaybeNull(zkClient, fullChildPath)._1 + (fullChildPath, jsonOpt match { + case Some(json) => + parseRestartPartitions(json).mapValues(Seq(_)) + case None => + Map.empty[TopicAndPartition, Seq[Int]] + }) + }.toMap + } + + def parseRestartPartitions(jsonData: String): Map[TopicAndPartition, Int] = { + Json.parseFull(jsonData) match { + case Some(m) => + val data = m.asInstanceOf[Map[String, Any]] + val broker = data("broker").asInstanceOf[Int] + data.get("partitions") match { + case Some(partitionsSeq) => + partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => { + val topic = p.get("topic").get.asInstanceOf[String] + val partition = p.get("partition").get.asInstanceOf[Int] + TopicAndPartition(topic, partition) -> broker + }).toMap + case None => + Map.empty + } + case None => + Map.empty + } + } + def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) { val brokerIdPath = BrokerIdsPath + "/" + brokerId zkClient.delete(brokerIdPath) @@ -785,7 +824,7 @@ object ZkUtils extends Logging { def createZkClient(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int): ZkClient = { val zkClient = new ZkClient(zkUrl, sessionTimeout, connectionTimeout, ZKStringSerializer) - zkClient + zkClient } } diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index a13f2be..2cd70d2 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -46,17 +46,17 @@ class LogManagerTest extends JUnit3Suite { logDir = TestUtils.tempDir() logManager = createLogManager() logManager.startup - logDir = logManager.logDirs(0) + logDir = logManager.getLogDirs(0) } override def tearDown() { if(logManager != null) logManager.shutdown() CoreUtils.rm(logDir) - logManager.logDirs.foreach(CoreUtils.rm(_)) + logManager.getLogDirs.foreach(CoreUtils.rm(_)) super.tearDown() } - + /** * Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log. */ @@ -92,9 +92,9 @@ class LogManagerTest extends JUnit3Suite { offset = info.lastOffset } assertTrue("There should be more than one segment now.", log.numberOfSegments > 1) - + log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds)) - + time.sleep(maxLogAgeMs + 1) assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments) time.sleep(log.config.fileDeleteDelayMs + 1) @@ -177,15 +177,15 @@ class LogManagerTest extends JUnit3Suite { time.sleep(logManager.InitialTaskDelayMs) assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime) } - + /** * Test that new logs that are created are assigned to the least loaded log directory */ @Test def testLeastLoadedAssignment() { // create a log manager with multiple data directories - val dirs = Array(TestUtils.tempDir(), - TestUtils.tempDir(), + val dirs = Array(TestUtils.tempDir(), + TestUtils.tempDir(), TestUtils.tempDir()) logManager.shutdown() logManager = createLogManager() @@ -198,7 +198,7 @@ class LogManagerTest extends JUnit3Suite { assertTrue("Load should balance evenly", counts.max <= counts.min + 1) } } - + /** * Test that it is not possible to open two log managers using the same data directory */ @@ -208,7 +208,7 @@ class LogManagerTest extends JUnit3Suite { createLogManager() fail("Should not be able to create a second log manager instance with the same data directory") } catch { - case e: KafkaException => // this is good + case e: KafkaException => // this is good } } diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala index 60cd824..50a1558 100755 --- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala +++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala @@ -37,10 +37,10 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { logDirs = config.logDirs.map(new File(_)).toArray, cleanerConfig = CleanerConfig()) } - + @After def teardown() { - for(manager <- logManagers; dir <- manager.logDirs) + for(manager <- logManagers; dir <- manager.getLogDirs) CoreUtils.rm(dir) } @@ -48,7 +48,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { // mock zkclient val zkClient = EasyMock.createMock(classOf[ZkClient]) EasyMock.replay(zkClient) - + // create kafka scheduler val scheduler = new KafkaScheduler(2) scheduler.startup @@ -138,9 +138,8 @@ class HighwatermarkPersistenceTest extends JUnit3Suite { replicaManager.shutdown(false) } - def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = { replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0)).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L) } - + } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 17e9fe4..b168cee 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -855,7 +855,7 @@ object TestUtils extends Logging { servers.forall(server => topicAndPartitions.forall(tp => server.getLogManager().getLog(tp).isEmpty))) // ensure that topic is removed from all cleaner offsets TestUtils.waitUntilTrue(() => servers.forall(server => topicAndPartitions.forall { tp => - val checkpoints = server.getLogManager().logDirs.map { logDir => + val checkpoints = server.getLogManager().getLogDirs.map { logDir => new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read() } checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp)) -- 2.3.2 (Apple Git-55)