diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 5c9307d..1087a2e 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -25,10 +25,11 @@ import kafka.log.LogConfig import kafka.server.ReplicaManager import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup -import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} +import kafka.controller.KafkaController import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping} +import java.io.IOException /** @@ -135,6 +136,22 @@ class Partition(val topic: String, assignedReplicaMap.remove(replicaId) } + def delete() { + // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted + leaderIsrUpdateLock synchronized { + assignedReplicaMap.clear() + inSyncReplicas = Set.empty[Replica] + leaderReplicaIdOpt = None + try { + logManager.deleteLog(TopicAndPartition(topic, partitionId)) + } catch { + case e: IOException => + fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e) + Runtime.getRuntime().halt(1) + } + } + } + def getLeaderEpoch(): Int = { leaderIsrUpdateLock synchronized { return this.leaderEpoch diff --git a/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala b/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala new file mode 100644 index 0000000..5ea6632 --- /dev/null +++ b/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Thrown when a log cleaning task is requested to be aborted. + */ +class LogCleaningAbortedException() extends RuntimeException() { +} diff --git a/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala b/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala deleted file mode 100644 index 0e69110..0000000 --- a/core/src/main/scala/kafka/common/OptimisticLockFailureException.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Thrown when an optimistic locking attempt receives concurrent modifications - */ -class OptimisticLockFailureException(message: String) extends RuntimeException(message) \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/ThreadShutdownException.scala b/core/src/main/scala/kafka/common/ThreadShutdownException.scala new file mode 100644 index 0000000..6554a5e --- /dev/null +++ b/core/src/main/scala/kafka/common/ThreadShutdownException.scala @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * An exception that indicates a thread is being shut down normally. + */ +class ThreadShutdownException() extends RuntimeException { +} diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index beda421..b3ab522 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -63,12 +63,11 @@ class Log(val dir: File, private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] loadSegments() - /* The number of times the log has been truncated */ - private val truncates = new AtomicInteger(0) - /* Calculate the offset of the next message */ private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset()) + val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name) + info("Completed load of log %s with log end offset %d".format(name, logEndOffset)) newGauge(name + "-" + "NumLogSegments", @@ -202,11 +201,6 @@ class Log(val dir: File, def numberOfSegments: Int = segments.size /** - * The number of truncates that have occurred since the log was opened. - */ - def numberOfTruncates: Int = truncates.get - - /** * Close this log */ def close() { @@ -524,16 +518,19 @@ class Log(val dir: File, /** * Completely delete this log directory and all contents from the file system with no delay */ - def delete(): Unit = { - logSegments.foreach(_.delete()) - Utils.rm(dir) + private[log] def delete() { + lock synchronized { + logSegments.foreach(_.delete()) + segments.clear() + Utils.rm(dir) + } } /** * Truncate this log so that it ends with the greatest offset < targetOffset. * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete. */ - def truncateTo(targetOffset: Long) { + private[log] def truncateTo(targetOffset: Long) { info("Truncating log %s to offset %d.".format(name, targetOffset)) if(targetOffset < 0) throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset)) @@ -551,7 +548,6 @@ class Log(val dir: File, this.nextOffset.set(targetOffset) this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) } - truncates.getAndIncrement } } @@ -559,7 +555,7 @@ class Log(val dir: File, * Delete all data in the log and start at the new offset * @param newOffset The new offset to start the log with */ - def truncateFullyAndStartAt(newOffset: Long) { + private[log] def truncateFullyAndStartAt(newOffset: Long) { debug("Truncate and start log '" + name + "' to " + newOffset) lock synchronized { val segmentsToDelete = logSegments.toList @@ -571,7 +567,6 @@ class Log(val dir: File, time = time)) this.nextOffset.set(newOffset) this.recoveryPoint = math.min(newOffset, this.recoveryPoint) - truncates.getAndIncrement } } @@ -650,10 +645,8 @@ class Log(val dir: File, * @param newSegment The new log segment to add to the log * @param oldSegments The old log segments to delete from the log */ - private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], expectedTruncates: Int) { + private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment]) { lock synchronized { - if(expectedTruncates != numberOfTruncates) - throw new OptimisticLockFailureException("The log has been truncated, expected %d but found %d.".format(expectedTruncates, numberOfTruncates)) // need to do this in two phases to be crash safe AND do the delete asynchronously // if we crash in the middle of this we complete the swap in loadSegments() newSegment.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix) @@ -735,5 +728,13 @@ object Log { def indexFilename(dir: File, offset: Long) = new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix) + + /** + * Parse the topic and partition out of the directory name of a log + */ + def parseTopicPartitionName(name: String): TopicAndPartition = { + val index = name.lastIndexOf('-') + TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt) + } } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index ccde2ab..6404647 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -20,15 +20,12 @@ package kafka.log import scala.collection._ import scala.math import java.nio._ -import java.util.concurrent.Semaphore -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic._ import java.util.Date import java.io.File import kafka.common._ import kafka.message._ -import kafka.server.OffsetCheckpoint import kafka.utils._ +import java.lang.IllegalStateException /** * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy. @@ -67,19 +64,9 @@ class LogCleaner(val config: CleanerConfig, val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log], time: Time = SystemTime) extends Logging { - - /* the offset checkpoints holding the last cleaned point for each log */ - private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap - - /* the set of logs currently being cleaned */ - private val inProgress = mutable.HashSet[TopicAndPartition]() - - /* a global lock used to control all access to the in-progress set and the offset checkpoints */ - private val lock = new Object - - /* a counter for creating unique thread names*/ - private val threadId = new AtomicInteger(0) - + /* for managing the state of partitions being cleaned. */ + private val cleanerManager = new LogCleanerManager(logDirs, logs); + /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, checkIntervalMs = 300, @@ -87,10 +74,7 @@ class LogCleaner(val config: CleanerConfig, time = time) /* the threads */ - private val cleaners = (0 until config.numThreads).map(_ => new CleanerThread()) - - /* a hook for testing to synchronize on log cleaning completions */ - private val cleaned = new Semaphore(0) + private val cleaners = (0 until config.numThreads).map(new CleanerThread(_)) /** * Start the background cleaning @@ -105,102 +89,79 @@ class LogCleaner(val config: CleanerConfig, */ def shutdown() { info("Shutting down the log cleaner.") - cleaners.foreach(_.interrupt()) - cleaners.foreach(_.join()) + cleaners.foreach(_.shutdown()) } /** - * For testing, a way to know when work has completed. This method blocks until the - * cleaner has processed up to the given offset on the specified topic/partition + * Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of + * the partition is aborted. */ - def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = { - while(!allCleanerCheckpoints.contains(TopicAndPartition(topic, part))) - cleaned.tryAcquire(timeout, TimeUnit.MILLISECONDS) + def abortCleaning(topicAndPartition: TopicAndPartition) { + cleanerManager.abortCleaning(topicAndPartition) } - + /** - * @return the position processed for all logs. + * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. + * This call blocks until the cleaning of the partition is aborted and paused. */ - def allCleanerCheckpoints(): Map[TopicAndPartition, Long] = - checkpoints.values.flatMap(_.read()).toMap - - /** - * Choose the log to clean next and add it to the in-progress set. We recompute this - * every time off the full set of logs to allow logs to be dynamically added to the pool of logs - * the log manager maintains. - */ - private def grabFilthiestLog(): Option[LogToClean] = { - lock synchronized { - val lastClean = allCleanerCheckpoints() - val cleanableLogs = logs.filter(l => l._2.config.dedupe) // skip any logs marked for delete rather than dedupe - .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress - .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each - val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0) // must have some bytes - .filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio - if(dirtyLogs.isEmpty) { - None - } else { - val filthiest = dirtyLogs.max - inProgress += filthiest.topicPartition - Some(filthiest) - } - } + def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) { + cleanerManager.abortAndPauseCleaning(topicAndPartition) } - + /** - * Save out the endOffset and remove the given log from the in-progress set. + * Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed. */ - private def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) { - lock synchronized { - val checkpoint = checkpoints(dataDir) - val offsets = checkpoint.read() + ((topicAndPartition, endOffset)) - checkpoint.write(offsets) - inProgress -= topicAndPartition - } - cleaned.release() + def resumeCleaning(topicAndPartition: TopicAndPartition) { + cleanerManager.resumeCleaning(topicAndPartition) } /** + * TODO: + * For testing, a way to know when work has completed. This method blocks until the + * cleaner has processed up to the given offset on the specified topic/partition + */ + def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit = { + while(!cleanerManager.allCleanerCheckpoints.contains(TopicAndPartition(topic, part))) + Thread.sleep(10) + } + + /** * The cleaner threads do the actual log cleaning. Each thread processes does its cleaning repeatedly by * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments. */ - private class CleanerThread extends Thread { + private class CleanerThread(threadId: Int) + extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) { if(config.dedupeBufferSize / config.numThreads > Int.MaxValue) warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...") - val cleaner = new Cleaner(id = threadId.getAndIncrement(), + + val cleaner = new Cleaner(id = threadId, offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, hashAlgorithm = config.hashAlgorithm), ioBufferSize = config.ioBufferSize / config.numThreads / 2, maxIoBufferSize = config.maxMessageSize, dupBufferLoadFactor = config.dedupeBufferLoadFactor, throttler = throttler, - time = time) + time = time, + checkDone = checkDone) - setName("kafka-log-cleaner-thread-" + cleaner.id) - setDaemon(false) + private def checkDone(topicAndPartition: TopicAndPartition) { + if (!isRunning.get()) + throw new ThreadShutdownException + cleanerManager.checkCleaningAborted(topicAndPartition) + } /** * The main loop for the cleaner thread */ - override def run() { - info("Starting cleaner thread %d...".format(cleaner.id)) - try { - while(!isInterrupted) { - cleanOrSleep() - } - } catch { - case e: InterruptedException => // all done - case e: Exception => - error("Error in cleaner thread %d:".format(cleaner.id), e) - } - info("Shutting down cleaner thread %d.".format(cleaner.id)) + override def doWork() { + cleanOrSleep() } /** * Clean a log if there is a dirty log available, otherwise sleep for a bit */ private def cleanOrSleep() { - grabFilthiestLog() match { + cleanerManager.grabFilthiestLog() match { case None => // there are no cleanable logs, sleep a while time.sleep(config.backOffMs) @@ -211,10 +172,9 @@ class LogCleaner(val config: CleanerConfig, endOffset = cleaner.clean(cleanable) logStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleaner.stats) } catch { - case e: OptimisticLockFailureException => - info("Cleaning of log was aborted due to colliding truncate operation.") + case pe: LogCleaningAbortedException => // task can be aborted, let it go. } finally { - doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) + cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) } } } @@ -260,7 +220,8 @@ private[log] class Cleaner(val id: Int, maxIoBufferSize: Int, dupBufferLoadFactor: Double, throttler: Throttler, - time: Time) extends Logging { + time: Time, + checkDone: (TopicAndPartition) => Unit) extends Logging { this.logIdent = "Cleaner " + id + ": " @@ -284,8 +245,7 @@ private[log] class Cleaner(val id: Int, stats.clear() info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log - val truncateCount = log.numberOfTruncates - + // build the offset map info("Building offset map for %s...".format(cleanable.log.name)) val upperBoundOffset = log.activeSegment.baseOffset @@ -303,7 +263,7 @@ private[log] class Cleaner(val id: Int, // group the segments and clean the groups info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs))) for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) - cleanSegments(log, group, offsetMap, truncateCount, deleteHorizonMs) + cleanSegments(log, group, offsetMap, deleteHorizonMs) stats.allDone() endOffset @@ -318,10 +278,9 @@ private[log] class Cleaner(val id: Int, * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet * @param deleteHorizonMs The time to retain delete tombstones */ - private[log] def cleanSegments(log: Log, + private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, - expectedTruncateCount: Int, deleteHorizonMs: Long) { // create a new segment with the suffix .cleaned appended to both the log and index name val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix) @@ -332,32 +291,32 @@ private[log] class Cleaner(val id: Int, val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize) val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, time) - // clean segments into the new destination segment - for (old <- segments) { - val retainDeletes = old.lastModified > deleteHorizonMs - info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes." - .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) - cleanInto(old, cleaned, map, retainDeletes) - } - - // trim excess index - index.trimToValidSize() - - // flush new segment to disk before swap - cleaned.flush() - - // update the modification date to retain the last modified date of the original files - val modified = segments.last.lastModified - cleaned.lastModified = modified - - // swap in new segment - info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name)) try { - log.replaceSegments(cleaned, segments, expectedTruncateCount) + // clean segments into the new destination segment + for (old <- segments) { + val retainDeletes = old.lastModified > deleteHorizonMs + info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes." + .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) + cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes) + } + + // trim excess index + index.trimToValidSize() + + // flush new segment to disk before swap + cleaned.flush() + + // update the modification date to retain the last modified date of the original files + val modified = segments.last.lastModified + cleaned.lastModified = modified + + // swap in new segment + info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name)) + log.replaceSegments(cleaned, segments) } catch { - case e: OptimisticLockFailureException => + case e: LogCleaningAbortedException => cleaned.delete() - throw e + throw e } } @@ -372,10 +331,11 @@ private[log] class Cleaner(val id: Int, * * TODO: Implement proper compression support */ - private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) { + private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment, + dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) { var position = 0 while (position < source.log.sizeInBytes) { - checkDone() + checkDone(topicAndPartition) // read a chunk of messages and copy any that are to be retained to the write buffer to be written out readBuffer.clear() writeBuffer.clear() @@ -491,9 +451,9 @@ private[log] class Cleaner(val id: Int, require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name)) val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong for (segment <- dirty) { - checkDone() + checkDone(log.topicAndPartition) if(segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor) - offset = buildOffsetMap(segment, map) + offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map) } info("Offset map for log %s complete.".format(log.name)) offset @@ -507,11 +467,11 @@ private[log] class Cleaner(val id: Int, * * @return The final offset covered by the map */ - private def buildOffsetMap(segment: LogSegment, map: OffsetMap): Long = { + private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = { var position = 0 var offset = segment.baseOffset while (position < segment.log.sizeInBytes) { - checkDone() + checkDone(topicAndPartition) readBuffer.clear() val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position)) throttler.maybeThrottle(messages.sizeInBytes) @@ -532,14 +492,6 @@ private[log] class Cleaner(val id: Int, restoreBuffers() offset } - - /** - * If we aren't running any more throw an AllDoneException - */ - private def checkDone() { - if (Thread.currentThread.isInterrupted) - throw new InterruptedException - } } /** diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala new file mode 100644 index 0000000..1612c8d --- /dev/null +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.log + +import java.io.File +import kafka.utils.{Logging, Pool} +import kafka.server.OffsetCheckpoint +import collection.mutable +import java.util.concurrent.locks.ReentrantLock +import kafka.utils.Utils._ +import java.util.concurrent.TimeUnit +import kafka.common.{LogCleaningAbortedException, TopicAndPartition} + +private[log] sealed trait LogCleaningState +private[log] case object LogCleaningInProgress extends LogCleaningState +private[log] case object LogCleaningAborted extends LogCleaningState +private[log] case object LogCleaningPaused extends LogCleaningState + +/** + * Manage the state of each partition being cleaned. + * If a partition is to be cleaned, it enters the LogCleaningInProgress state. + * While a partition is being cleaned, it can be requested to be aborted and paused. Then the partition first enters + * the LogCleaningAborted state. Once the cleaning task is aborted, the partition enters the LogCleaningPaused state. + * While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is + * requested to be resumed. + */ +private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging { + /* the offset checkpoints holding the last cleaned point for each log */ + private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap + + /* the set of logs currently being cleaned */ + private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]() + + /* a global lock used to control all access to the in-progress set and the offset checkpoints */ + private val lock = new ReentrantLock + /* for coordinating the pausing and the cleaning of a partition */ + private val pausedCleaningCond = lock.newCondition() + + /** + * @return the position processed for all logs. + */ + def allCleanerCheckpoints(): Map[TopicAndPartition, Long] = + checkpoints.values.flatMap(_.read()).toMap + + /** + * Choose the log to clean next and add it to the in-progress set. We recompute this + * every time off the full set of logs to allow logs to be dynamically added to the pool of logs + * the log manager maintains. + */ + def grabFilthiestLog(): Option[LogToClean] = { + inLock(lock) { + val lastClean = allCleanerCheckpoints() + val cleanableLogs = logs.filter(l => l._2.config.dedupe) // skip any logs marked for delete rather than dedupe + .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress + .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each + val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0) // must have some bytes + .filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio + if(dirtyLogs.isEmpty) { + None + } else { + val filthiest = dirtyLogs.max + inProgress.put(filthiest.topicPartition, LogCleaningInProgress) + Some(filthiest) + } + } + } + + /** + * Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of + * the partition is aborted. + * This is implemented by first abortAndPausing and then resuming the cleaning of the partition. + */ + def abortCleaning(topicAndPartition: TopicAndPartition) { + inLock(lock) { + abortAndPauseCleaning(topicAndPartition) + resumeCleaning(topicAndPartition) + info("The cleaning for partition %s is aborted".format(topicAndPartition)) + } + } + + /** + * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. + * This call blocks until the cleaning of the partition is aborted and paused. + * 1. If the partition is not in progress, mark it as paused. + * 2. Otherwise, first mark the state of the partition as aborted. + * 3. The cleaner thread checks the state periodically and if it sees the state of the partition is aborted, it + * throws a LogCleaningAbortedException to stop the cleaning task. + * 4. When the cleaning task is stopped, doneCleaning() is called, which sets the state of the partition as paused. + * 5. abortAndPauseCleaning() waits until the state of the partition is changed to paused. + */ + def abortAndPauseCleaning(topicAndPartition: TopicAndPartition) { + inLock(lock) { + inProgress.get(topicAndPartition) match { + case None => + inProgress.put(topicAndPartition, LogCleaningPaused) + case Some(state) => + state match { + case LogCleaningInProgress => + inProgress.put(topicAndPartition, LogCleaningAborted) + case s => + throw new IllegalStateException(("Partiiton %s can't be aborted and pasued since it's in %s state").format(topicAndPartition, s)) + } + } + while (!isCleaningInState(topicAndPartition, LogCleaningPaused)) + pausedCleaningCond.await(100, TimeUnit.MILLISECONDS) + info("The cleaning for partition %s is aborted and paused".format(topicAndPartition)) + } + } + + /** + * Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed. + */ + def resumeCleaning(topicAndPartition: TopicAndPartition) { + inLock(lock) { + inProgress.get(topicAndPartition) match { + case None => + throw new IllegalStateException(("Partiiton %s can't be resumed since it's never paused").format(topicAndPartition)) + case Some(state) => + state match { + case LogCleaningPaused => + inProgress.remove(topicAndPartition) + case s => + throw new IllegalStateException(("Partiiton %s can't be resumed since it's in %s state").format(topicAndPartition, s)) + } + } + } + info("The cleaning for partition %s is resumed".format(topicAndPartition)) + } + + /** + * Check if the cleaning for a partition is in a particular state. The caller is expected to hold lock while making the call. + */ + def isCleaningInState(topicAndPartition: TopicAndPartition, expectedState: LogCleaningState): Boolean = { + inProgress.get(topicAndPartition) match { + case None => return false + case Some(state) => + if (state == expectedState) + return true + else + return false + } + } + + /** + * Check if the cleaning for a partition is aborted. If so, throw an exception. + */ + def checkCleaningAborted(topicAndPartition: TopicAndPartition) { + inLock(lock) { + if (isCleaningInState(topicAndPartition, LogCleaningAborted)) + throw new LogCleaningAbortedException() + } + } + + /** + * Save out the endOffset and remove the given log from the in-progress set, if not aborted. + */ + def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) { + inLock(lock) { + inProgress(topicAndPartition) match { + case LogCleaningInProgress => + val checkpoint = checkpoints(dataDir) + val offsets = checkpoint.read() + ((topicAndPartition, endOffset)) + checkpoint.write(offsets) + inProgress.remove(topicAndPartition) + case LogCleaningAborted => + inProgress.put(topicAndPartition, LogCleaningPaused) + pausedCleaningCond.signalAll() + case s => + throw new IllegalStateException(("In-progress partiiton %s can't be in %s state").format(topicAndPartition, s)) + } + } + } +} diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 81be88a..10062af 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -22,10 +22,8 @@ import java.util.concurrent.TimeUnit import kafka.utils._ import scala.collection._ import kafka.common.{TopicAndPartition, KafkaException} -import kafka.server.KafkaConfig import kafka.server.OffsetCheckpoint - /** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. * All read and write operations are delegated to the individual log instances. @@ -50,9 +48,9 @@ class LogManager(val logDirs: Array[File], val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" val LockFile = ".lock" val InitialTaskDelayMs = 30*1000 - private val logCreationLock = new Object + private val logCreationOrDeletionLock = new Object private val logs = new Pool[TopicAndPartition, Log]() - + createAndValidateLogDirs(logDirs) private var dirLocks = lockLogDirs(logDirs) private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap @@ -115,7 +113,7 @@ class LogManager(val logDirs: Array[File], for(dir <- subDirs) { if(dir.isDirectory) { info("Loading log '" + dir.getName + "'") - val topicPartition = parseTopicPartitionName(dir.getName) + val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, @@ -194,12 +192,36 @@ class LogManager(val logDirs: Array[File], val log = logs.get(topicAndPartition) // If the log does not exist, skip it if (log != null) { + //May need to abort and pause the cleaning of the log, and resume after truncation is done. + val needToStopCleaner: Boolean = (truncateOffset < log.activeSegment.baseOffset) + if (needToStopCleaner && cleaner != null) + cleaner.abortAndPauseCleaning(topicAndPartition) log.truncateTo(truncateOffset) + if (needToStopCleaner && cleaner != null) + cleaner.resumeCleaning(topicAndPartition) } } checkpointRecoveryPointOffsets() } - + + /** + * Delete all data in a partition and start the log at the new offset + * @param newOffset The new offset to start the log with + */ + def truncateFullyAndStartAt(topicAndPartition: TopicAndPartition, newOffset: Long) { + val log = logs.get(topicAndPartition) + // If the log does not exist, skip it + if (log != null) { + //Abort and pause the cleaning of the log, and resume after truncation is done. + if (cleaner != null) + cleaner.abortAndPauseCleaning(topicAndPartition) + log.truncateFullyAndStartAt(newOffset) + if (cleaner != null) + cleaner.resumeCleaning(topicAndPartition) + } + checkpointRecoveryPointOffsets() + } + /** * Write out the current recovery point for all logs to a text file in the log directory * to avoid recovering the whole log on startup. @@ -229,7 +251,7 @@ class LogManager(val logDirs: Array[File], * If the log already exists, just return a copy of the existing log */ def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = { - logCreationLock synchronized { + logCreationOrDeletionLock synchronized { var log = logs.get(topicAndPartition) // check if the log has already been created in another thread @@ -254,7 +276,27 @@ class LogManager(val logDirs: Array[File], log } } - + + /** + * Delete a log. + */ + def deleteLog(topicAndPartition: TopicAndPartition) { + var removedLog: Log = null + logCreationOrDeletionLock synchronized { + removedLog = logs.remove(topicAndPartition) + } + if (removedLog != null) { + //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. + if (cleaner != null) + cleaner.abortCleaning(topicAndPartition) + removedLog.delete() + info("Deleted log for partition [%s,%d] in %s." + .format(topicAndPartition.topic, + topicAndPartition.partition, + removedLog.dir.getAbsolutePath)) + } + } + /** * Choose the next directory in which to create a log. Currently this is done * by calculating the number of partitions in each directory and then choosing the @@ -280,7 +322,6 @@ class LogManager(val logDirs: Array[File], */ private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds - val topic = parseTopicPartitionName(log.name).topic log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs) } @@ -289,7 +330,6 @@ class LogManager(val logDirs: Array[File], * is at least logRetentionSize bytes in size */ private def cleanupSegmentsToMaintainSize(log: Log): Int = { - val topic = parseTopicPartitionName(log.dir.getName).topic if(log.config.retentionSize < 0 || log.size < log.config.retentionSize) return 0 var diff = log.size - log.config.retentionSize @@ -334,6 +374,7 @@ class LogManager(val logDirs: Array[File], */ private def flushDirtyLogs() = { debug("Checking for dirty logs to flush...") + for ((topicAndPartition, log) <- logs) { try { val timeSinceLastFlush = time.milliseconds - log.lastFlushTime @@ -344,22 +385,7 @@ class LogManager(val logDirs: Array[File], } catch { case e: Throwable => error("Error flushing topic " + topicAndPartition.topic, e) - e match { - case _: IOException => - fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e) - System.exit(1) - case _ => - } } } } - - /** - * Parse the topic and partition out of the directory name of a log - */ - private def parseTopicPartitionName(name: String): TopicAndPartition = { - val index = name.lastIndexOf('-') - TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt) - } - } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 715845b..73e605e 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -68,7 +68,6 @@ class ReplicaFetcherThread(name:String, */ def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = { val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get - val log = replica.log.get /** * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up @@ -81,8 +80,8 @@ class ReplicaFetcherThread(name:String, * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now. */ val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) - if (leaderEndOffset < log.logEndOffset) { - log.truncateTo(leaderEndOffset) + if (leaderEndOffset < replica.logEndOffset) { + replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset)) warn("Replica %d for partition %s reset its fetch offset to current leader %d's latest offset %d" .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderEndOffset)) leaderEndOffset @@ -94,7 +93,7 @@ class ReplicaFetcherThread(name:String, * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching. */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) - log.truncateFullyAndStartAt(leaderStartOffset) + replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) warn("Replica %d for partition %s reset its fetch offset to current leader %d's start offset %d" .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderStartOffset)) leaderStartOffset diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 242c18d..f9d10d3 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -29,7 +29,7 @@ import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit import kafka.common._ import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} -import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} +import kafka.controller.KafkaController import org.apache.log4j.Logger @@ -116,16 +116,16 @@ class ReplicaManager(val config: KafkaConfig, def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = { stateChangeLogger.trace("Broker %d handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId)) val errorCode = ErrorMapping.NoError - getReplica(topic, partitionId) match { - case Some(replica) => - /* TODO: handle deleteLog in a better way */ - //if (deletePartition) - // logManager.deleteLog(topic, partition) + getPartition(topic, partitionId) match { + case Some(partition) => leaderPartitionsLock synchronized { - leaderPartitions -= replica.partition + leaderPartitions -= partition + } + if(deletePartition) { + val removedPartition = allPartitions.remove((topic, partitionId)) + if (removedPartition != null) + removedPartition.delete() // this will delete the local log } - if(deletePartition) - allPartitions.remove((topic, partitionId)) case None => //do nothing if replica no longer exists } stateChangeLogger.trace("Broker %d finished handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId)) diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 52d35a3..623efb1 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -23,9 +23,10 @@ import java.util.Properties import kafka.utils._ import kafka.log._ import kafka.zk.ZooKeeperTestHarness -import kafka.server.{KafkaServer, KafkaConfig} import kafka.utils.{Logging, ZkUtils, TestUtils} -import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition} +import kafka.common.{TopicExistsException, TopicAndPartition} +import kafka.server.{KafkaServer, KafkaConfig} +import java.io.File class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @@ -132,6 +133,12 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } } + private def getBrokersWithPartitionDir(servers: Iterable[KafkaServer], topic: String, partitionId: Int): Set[Int] = { + servers.filter {case server => new File(server.config.logDirs.head, topic + "-" + partitionId).exists} + .map(_.config.brokerId) + .toSet + } + @Test def testPartitionReassignmentWithLeaderInNewReplicas() { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) @@ -157,6 +164,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers) + assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000)) servers.foreach(_.shutdown()) } @@ -184,6 +192,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers) + assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000)) servers.foreach(_.shutdown()) } @@ -211,6 +220,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers) + assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000)) servers.foreach(_.shutdown()) } @@ -251,6 +261,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas) // ensure that there are no under replicated partitions ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers) + assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000)) servers.foreach(_.shutdown()) } diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 5a312bf..51cd94b 100644 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -19,7 +19,7 @@ package kafka.log import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite -import org.junit.{After, Before, Test} +import org.junit.{After, Test} import java.nio._ import java.io.File import scala.collection._ @@ -62,7 +62,7 @@ class CleanerTest extends JUnitSuite { keys.foreach(k => map.put(key(k), Long.MaxValue)) // clean the log - cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0, 0L) + cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L) val shouldRemain = keysInLog(log).filter(!keys.contains(_)) assertEquals(shouldRemain, keysInLog(log)) } @@ -94,29 +94,31 @@ class CleanerTest extends JUnitSuite { /* extract all the keys from a log */ def keysInLog(log: Log): Iterable[Int] = log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).map(m => Utils.readString(m.message.key).toInt)) - - + + def abortCheckDone(topicAndPartition: TopicAndPartition) { + throw new LogCleaningAbortedException() + } + /** - * Test that a truncation during cleaning throws an OptimisticLockFailureException + * Test that abortion during cleaning throws a LogCleaningAbortedException */ @Test - def testCleanSegmentsWithTruncation() { - val cleaner = makeCleaner(Int.MaxValue) + def testCleanSegmentsWithAbort() { + val cleaner = makeCleaner(Int.MaxValue, abortCheckDone) val log = makeLog(config = logConfig.copy(segmentSize = 1024)) - + // append messages to the log until we have four segments - while(log.numberOfSegments < 2) + while(log.numberOfSegments < 4) log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt)) - - log.truncateTo(log.logEndOffset-2) + val keys = keysInLog(log) val map = new FakeOffsetMap(Int.MaxValue) keys.foreach(k => map.put(key(k), Long.MaxValue)) - intercept[OptimisticLockFailureException] { - cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0, 0L) + intercept[LogCleaningAbortedException] { + cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L) } } - + /** * Validate the logic for grouping log segments together for cleaning */ @@ -196,15 +198,18 @@ class CleanerTest extends JUnitSuite { def makeLog(dir: File = dir, config: LogConfig = logConfig) = new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) - - def makeCleaner(capacity: Int) = + + def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */ } + + def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = noOpCheckDone) = new Cleaner(id = 0, offsetMap = new FakeOffsetMap(capacity), ioBufferSize = 64*1024, maxIoBufferSize = 64*1024, dupBufferLoadFactor = 0.75, throttler = throttler, - time = time) + time = time, + checkDone = checkDone ) def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = { for((key, value) <- seq)