diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 02ccc17..a033639 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -25,10 +25,11 @@ import kafka.log.LogConfig import kafka.server.ReplicaManager import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup -import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} +import kafka.controller.KafkaController import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping} +import java.io.IOException /** @@ -135,6 +136,22 @@ class Partition(val topic: String, assignedReplicaMap.remove(replicaId) } + def delete() { + // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted + leaderIsrUpdateLock synchronized { + assignedReplicaMap.clear() + inSyncReplicas = Set.empty[Replica] + leaderReplicaIdOpt = None + try { + logManager.deleteLog(TopicAndPartition(topic, partitionId)) + } catch { + case e: IOException => + fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e) + Runtime.getRuntime().halt(1) + } + } + } + def getLeaderEpoch(): Int = { leaderIsrUpdateLock synchronized { return this.leaderEpoch diff --git a/core/src/main/scala/kafka/common/AllDoneException.scala b/core/src/main/scala/kafka/common/AllDoneException.scala new file mode 100644 index 0000000..a32305f --- /dev/null +++ b/core/src/main/scala/kafka/common/AllDoneException.scala @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * An exception that indicates no more work to do in a thread, typically due to a thread is shut down. + */ +class AllDoneException() extends RuntimeException { +} diff --git a/core/src/main/scala/kafka/common/CancelledCleaningException.scala b/core/src/main/scala/kafka/common/CancelledCleaningException.scala new file mode 100644 index 0000000..fa9ed10 --- /dev/null +++ b/core/src/main/scala/kafka/common/CancelledCleaningException.scala @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Thrown when a log cleaning task is cancelled. + */ +class CancelledCleaningException(message: String, cause: Throwable) extends RuntimeException(message, cause) { + def this(message: String) = this(message, null) + def this() = this(null, null) +} diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index beda421..21b70cc 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -69,6 +69,8 @@ class Log(val dir: File, /* Calculate the offset of the next message */ private val nextOffset: AtomicLong = new AtomicLong(activeSegment.nextOffset()) + val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name) + info("Completed load of log %s with log end offset %d".format(name, logEndOffset)) newGauge(name + "-" + "NumLogSegments", @@ -525,8 +527,11 @@ class Log(val dir: File, * Completely delete this log directory and all contents from the file system with no delay */ def delete(): Unit = { - logSegments.foreach(_.delete()) - Utils.rm(dir) + lock synchronized { + logSegments.foreach(_.delete()) + segments.clear() + Utils.rm(dir) + } } /** @@ -735,5 +740,13 @@ object Log { def indexFilename(dir: File, offset: Long) = new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix) + + /** + * Parse the topic and partition out of the directory name of a log + */ + def parseTopicPartitionName(name: String): TopicAndPartition = { + val index = name.lastIndexOf('-') + TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt) + } } diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index ccde2ab..2a89500 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -22,13 +22,14 @@ import scala.math import java.nio._ import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic._ import java.util.Date import java.io.File import kafka.common._ import kafka.message._ import kafka.server.OffsetCheckpoint import kafka.utils._ +import kafka.utils.Utils.inLock +import java.util.concurrent.locks.ReentrantLock /** * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy. @@ -74,12 +75,13 @@ class LogCleaner(val config: CleanerConfig, /* the set of logs currently being cleaned */ private val inProgress = mutable.HashSet[TopicAndPartition]() + /* the set of log clean tasks to be cancelled */ + private val cancelled = mutable.HashSet[TopicAndPartition]() + /* a global lock used to control all access to the in-progress set and the offset checkpoints */ - private val lock = new Object + private val lock = new ReentrantLock + private val cond = lock.newCondition() - /* a counter for creating unique thread names*/ - private val threadId = new AtomicInteger(0) - /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, checkIntervalMs = 300, @@ -87,7 +89,7 @@ class LogCleaner(val config: CleanerConfig, time = time) /* the threads */ - private val cleaners = (0 until config.numThreads).map(_ => new CleanerThread()) + private val cleaners = (0 until config.numThreads).map(new CleanerThread(_)) /* a hook for testing to synchronize on log cleaning completions */ private val cleaned = new Semaphore(0) @@ -105,8 +107,7 @@ class LogCleaner(val config: CleanerConfig, */ def shutdown() { info("Shutting down the log cleaner.") - cleaners.foreach(_.interrupt()) - cleaners.foreach(_.join()) + cleaners.foreach(_.shutdown()) } /** @@ -130,7 +131,7 @@ class LogCleaner(val config: CleanerConfig, * the log manager maintains. */ private def grabFilthiestLog(): Option[LogToClean] = { - lock synchronized { + inLock(lock) { val lastClean = allCleanerCheckpoints() val cleanableLogs = logs.filter(l => l._2.config.dedupe) // skip any logs marked for delete rather than dedupe .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress @@ -146,16 +147,36 @@ class LogCleaner(val config: CleanerConfig, } } } - + + /** + * Cancel the cleaning of a particular log, if it's in progress. + */ + def cancelCleaning(topicAndPartition: TopicAndPartition) { + inLock(lock) { + cancelled += topicAndPartition + while (inProgress.contains(topicAndPartition)) + cond.await(100, TimeUnit.MILLISECONDS) + cancelled -= topicAndPartition + } + } + + private def checkCleaningCancelled(topicAndPartition: TopicAndPartition) { + inLock(lock) { + if (cancelled.contains(topicAndPartition)) + throw new CancelledCleaningException("The cleaning for partition %s is cancelled".format(topicAndPartition)) + } + } + /** * Save out the endOffset and remove the given log from the in-progress set. */ private def doneCleaning(topicAndPartition: TopicAndPartition, dataDir: File, endOffset: Long) { - lock synchronized { + inLock(lock) { val checkpoint = checkpoints(dataDir) val offsets = checkpoint.read() + ((topicAndPartition, endOffset)) checkpoint.write(offsets) inProgress -= topicAndPartition + cond.signalAll() } cleaned.release() } @@ -164,36 +185,31 @@ class LogCleaner(val config: CleanerConfig, * The cleaner threads do the actual log cleaning. Each thread processes does its cleaning repeatedly by * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments. */ - private class CleanerThread extends Thread { + private class CleanerThread(threadId: Int) extends ShutdownableThread("kafka-log-cleaner-thread-" + threadId, false) { if(config.dedupeBufferSize / config.numThreads > Int.MaxValue) warn("Cannot use more than 2G of cleaner buffer space per cleaner thread, ignoring excess buffer space...") - val cleaner = new Cleaner(id = threadId.getAndIncrement(), + + val cleaner = new Cleaner(id = threadId, offsetMap = new SkimpyOffsetMap(memory = math.min(config.dedupeBufferSize / config.numThreads, Int.MaxValue).toInt, hashAlgorithm = config.hashAlgorithm), ioBufferSize = config.ioBufferSize / config.numThreads / 2, maxIoBufferSize = config.maxMessageSize, dupBufferLoadFactor = config.dedupeBufferLoadFactor, throttler = throttler, - time = time) + time = time, + checkDone = checkDone) - setName("kafka-log-cleaner-thread-" + cleaner.id) - setDaemon(false) + private def checkDone(topicAndPartition: TopicAndPartition) { + if (!isRunning.get()) + throw new AllDoneException + checkCleaningCancelled(topicAndPartition) + } /** * The main loop for the cleaner thread */ - override def run() { - info("Starting cleaner thread %d...".format(cleaner.id)) - try { - while(!isInterrupted) { - cleanOrSleep() - } - } catch { - case e: InterruptedException => // all done - case e: Exception => - error("Error in cleaner thread %d:".format(cleaner.id), e) - } - info("Shutting down cleaner thread %d.".format(cleaner.id)) + override def doWork() { + cleanOrSleep() } /** @@ -211,8 +227,10 @@ class LogCleaner(val config: CleanerConfig, endOffset = cleaner.clean(cleanable) logStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleaner.stats) } catch { - case e: OptimisticLockFailureException => + case oe: OptimisticLockFailureException => info("Cleaning of log was aborted due to colliding truncate operation.") + case ce: CancelledCleaningException => + info(ce.getMessage) } finally { doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset) } @@ -260,7 +278,8 @@ private[log] class Cleaner(val id: Int, maxIoBufferSize: Int, dupBufferLoadFactor: Double, throttler: Throttler, - time: Time) extends Logging { + time: Time, + checkDone: (TopicAndPartition) => Unit) extends Logging { this.logIdent = "Cleaner " + id + ": " @@ -318,7 +337,7 @@ private[log] class Cleaner(val id: Int, * @param expectedTruncateCount A count used to check if the log is being truncated and rewritten under our feet * @param deleteHorizonMs The time to retain delete tombstones */ - private[log] def cleanSegments(log: Log, + private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, expectedTruncateCount: Int, @@ -332,32 +351,38 @@ private[log] class Cleaner(val id: Int, val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize) val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, time) - // clean segments into the new destination segment - for (old <- segments) { - val retainDeletes = old.lastModified > deleteHorizonMs - info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes." - .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) - cleanInto(old, cleaned, map, retainDeletes) - } - - // trim excess index - index.trimToValidSize() - - // flush new segment to disk before swap - cleaned.flush() - - // update the modification date to retain the last modified date of the original files - val modified = segments.last.lastModified - cleaned.lastModified = modified - - // swap in new segment - info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name)) try { - log.replaceSegments(cleaned, segments, expectedTruncateCount) + // clean segments into the new destination segment + for (old <- segments) { + val retainDeletes = old.lastModified > deleteHorizonMs + info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes." + .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding")) + cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes) + } + + // trim excess index + index.trimToValidSize() + + // flush new segment to disk before swap + cleaned.flush() + + // update the modification date to retain the last modified date of the original files + val modified = segments.last.lastModified + cleaned.lastModified = modified + + // swap in new segment + info("Swapping in cleaned segment %d for segment(s) %s in log %s.".format(cleaned.baseOffset, segments.map(_.baseOffset).mkString(","), log.name)) + try { + log.replaceSegments(cleaned, segments, expectedTruncateCount) + } catch { + case e: OptimisticLockFailureException => + cleaned.delete() + throw e + } } catch { - case e: OptimisticLockFailureException => + case e: CancelledCleaningException => cleaned.delete() - throw e + throw e } } @@ -372,10 +397,11 @@ private[log] class Cleaner(val id: Int, * * TODO: Implement proper compression support */ - private[log] def cleanInto(source: LogSegment, dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) { + private[log] def cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment, + dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) { var position = 0 while (position < source.log.sizeInBytes) { - checkDone() + checkDone(topicAndPartition) // read a chunk of messages and copy any that are to be retained to the write buffer to be written out readBuffer.clear() writeBuffer.clear() @@ -491,9 +517,9 @@ private[log] class Cleaner(val id: Int, require(offset == start, "Last clean offset is %d but segment base offset is %d for log %s.".format(start, offset, log.name)) val minStopOffset = (start + map.slots * this.dupBufferLoadFactor).toLong for (segment <- dirty) { - checkDone() + checkDone(log.topicAndPartition) if(segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor) - offset = buildOffsetMap(segment, map) + offset = buildOffsetMap(log.topicAndPartition, segment, map) } info("Offset map for log %s complete.".format(log.name)) offset @@ -507,11 +533,11 @@ private[log] class Cleaner(val id: Int, * * @return The final offset covered by the map */ - private def buildOffsetMap(segment: LogSegment, map: OffsetMap): Long = { + private def buildOffsetMap(topicAndPartition: TopicAndPartition, segment: LogSegment, map: OffsetMap): Long = { var position = 0 var offset = segment.baseOffset while (position < segment.log.sizeInBytes) { - checkDone() + checkDone(topicAndPartition) readBuffer.clear() val messages = new ByteBufferMessageSet(segment.log.readInto(readBuffer, position)) throttler.maybeThrottle(messages.sizeInBytes) @@ -532,14 +558,6 @@ private[log] class Cleaner(val id: Int, restoreBuffers() offset } - - /** - * If we aren't running any more throw an AllDoneException - */ - private def checkDone() { - if (Thread.currentThread.isInterrupted) - throw new InterruptedException - } } /** diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 81be88a..e41e35b 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -22,9 +22,9 @@ import java.util.concurrent.TimeUnit import kafka.utils._ import scala.collection._ import kafka.common.{TopicAndPartition, KafkaException} -import kafka.server.KafkaConfig import kafka.server.OffsetCheckpoint - +import kafka.utils.Utils.inLock +import java.util.concurrent.locks.ReentrantLock /** * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning. @@ -50,9 +50,14 @@ class LogManager(val logDirs: Array[File], val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" val LockFile = ".lock" val InitialTaskDelayMs = 30*1000 - private val logCreationLock = new Object + private val logCreationOrDeletionLock = new Object private val logs = new Pool[TopicAndPartition, Log]() - + private var flushInProgress: Boolean = false + + /* a lock for synchronization btw the deletion and the flush of a log */ + private val deleteLock = new ReentrantLock + private val deleteCond = deleteLock.newCondition() + createAndValidateLogDirs(logDirs) private var dirLocks = lockLogDirs(logDirs) private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap @@ -115,7 +120,7 @@ class LogManager(val logDirs: Array[File], for(dir <- subDirs) { if(dir.isDirectory) { info("Loading log '" + dir.getName + "'") - val topicPartition = parseTopicPartitionName(dir.getName) + val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, @@ -229,7 +234,7 @@ class LogManager(val logDirs: Array[File], * If the log already exists, just return a copy of the existing log */ def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = { - logCreationLock synchronized { + logCreationOrDeletionLock synchronized { var log = logs.get(topicAndPartition) // check if the log has already been created in another thread @@ -254,7 +259,28 @@ class LogManager(val logDirs: Array[File], log } } - + + /** + * Delete a log. + */ + def deleteLog(topicAndPartition: TopicAndPartition) { + var removedLog: Log = null + logCreationOrDeletionLock synchronized { + removedLog = logs.remove(topicAndPartition) + } + if (removedLog != null) { + //We need to wait until there is no more cleaning and flushing tasks on the log to be deleted before actually deleting it. + if (cleaner != null) + cleaner.cancelCleaning(topicAndPartition) + waitUntilInProgressFlusherIsDone(topicAndPartition) + removedLog.delete() + info("Deleted log for partition [%s,%d] in %s." + .format(topicAndPartition.topic, + topicAndPartition.partition, + removedLog.dir.getAbsolutePath)) + } + } + /** * Choose the next directory in which to create a log. Currently this is done * by calculating the number of partitions in each directory and then choosing the @@ -280,7 +306,6 @@ class LogManager(val logDirs: Array[File], */ private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds - val topic = parseTopicPartitionName(log.name).topic log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs) } @@ -289,7 +314,6 @@ class LogManager(val logDirs: Array[File], * is at least logRetentionSize bytes in size */ private def cleanupSegmentsToMaintainSize(log: Log): Int = { - val topic = parseTopicPartitionName(log.dir.getName).topic if(log.config.retentionSize < 0 || log.size < log.config.retentionSize) return 0 var diff = log.size - log.config.retentionSize @@ -334,6 +358,8 @@ class LogManager(val logDirs: Array[File], */ private def flushDirtyLogs() = { debug("Checking for dirty logs to flush...") + + markFlushInProgress() for ((topicAndPartition, log) <- logs) { try { val timeSinceLastFlush = time.milliseconds - log.lastFlushTime @@ -352,14 +378,26 @@ class LogManager(val logDirs: Array[File], } } } + markFlushDone() } - /** - * Parse the topic and partition out of the directory name of a log - */ - private def parseTopicPartitionName(name: String): TopicAndPartition = { - val index = name.lastIndexOf('-') - TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt) + private def waitUntilInProgressFlusherIsDone(topicAndPartition: TopicAndPartition) { + inLock(deleteLock) { + while (flushInProgress) + deleteCond.await(100, TimeUnit.MILLISECONDS) + } } + private def markFlushInProgress() { + inLock(deleteLock) { + flushInProgress = true + } + } + + private def markFlushDone() { + inLock(deleteLock) { + flushInProgress = false + deleteCond.signalAll() + } + } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f9c7c29..d56f221 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -29,7 +29,7 @@ import com.yammer.metrics.core.Gauge import java.util.concurrent.TimeUnit import kafka.common._ import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest} -import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} +import kafka.controller.KafkaController import org.apache.log4j.Logger @@ -115,16 +115,16 @@ class ReplicaManager(val config: KafkaConfig, def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = { stateChangeLogger.trace("Broker %d handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId)) val errorCode = ErrorMapping.NoError - getReplica(topic, partitionId) match { - case Some(replica) => - /* TODO: handle deleteLog in a better way */ - //if (deletePartition) - // logManager.deleteLog(topic, partition) + getPartition(topic, partitionId) match { + case Some(partition) => leaderPartitionsLock synchronized { - leaderPartitions -= replica.partition + leaderPartitions -= partition + } + if(deletePartition) { + val removedPartition = allPartitions.remove((topic, partitionId)) + if (removedPartition != null) + removedPartition.delete() // this will delete the local log } - if(deletePartition) - allPartitions.remove((topic, partitionId)) case None => //do nothing if replica no longer exists } stateChangeLogger.trace("Broker %d finished handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId)) diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index c30069e..9bd7297 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -23,9 +23,10 @@ import java.util.Properties import kafka.utils._ import kafka.log._ import kafka.zk.ZooKeeperTestHarness -import kafka.server.KafkaConfig import kafka.utils.{Logging, ZkUtils, TestUtils} -import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition} +import kafka.common.{TopicExistsException, TopicAndPartition} +import java.io.File +import kafka.server.{KafkaServer, KafkaConfig} class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @@ -132,6 +133,16 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } } + private def getPartitionDirCountInBrokers(servers: Iterable[KafkaServer], topic: String, partitionId: Int) = { + servers.foldLeft(0) { (total, server) => + val logFile = new File(server.config.logDirs.head, topic + "-" + partitionId) + if (logFile.exists()) + total + 1 + else + total + } + } + @Test def testPartitionReassignmentWithLeaderInNewReplicas() { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) @@ -154,6 +165,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) + assertTrue(TestUtils.waitUntilTrue(() => getPartitionDirCountInBrokers(servers, topic, 0) == 3, 5000)) servers.foreach(_.shutdown()) } @@ -179,7 +191,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) - // leader should be 2 + assertTrue(TestUtils.waitUntilTrue(() => getPartitionDirCountInBrokers(servers, topic, 0) == 3, 5000)) servers.foreach(_.shutdown()) } @@ -205,7 +217,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { }, 2000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) - // leader should be 2 + assertTrue(TestUtils.waitUntilTrue(() => getPartitionDirCountInBrokers(servers, topic, 0) == 2, 5000)) servers.foreach(_.shutdown()) } @@ -222,7 +234,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient) assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition)) - // leader should be 2 servers.foreach(_.shutdown()) } @@ -244,6 +255,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas) + assertTrue(TestUtils.waitUntilTrue(() => getPartitionDirCountInBrokers(servers, topic, 0) == 2, 5000)) servers.foreach(_.shutdown()) } diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 5a312bf..a764237 100644 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -196,7 +196,9 @@ class CleanerTest extends JUnitSuite { def makeLog(dir: File = dir, config: LogConfig = logConfig) = new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) - + + def checkDone(topicAndPartition: TopicAndPartition) { /* do nothing */ } + def makeCleaner(capacity: Int) = new Cleaner(id = 0, offsetMap = new FakeOffsetMap(capacity), @@ -204,7 +206,8 @@ class CleanerTest extends JUnitSuite { maxIoBufferSize = 64*1024, dupBufferLoadFactor = 0.75, throttler = throttler, - time = time) + time = time, + checkDone = checkDone ) def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = { for((key, value) <- seq)