diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 02ccc17..350141c 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -29,6 +29,7 @@ import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController} import org.apache.log4j.Logger import kafka.message.ByteBufferMessageSet import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping} +import java.io.IOException /** @@ -135,6 +136,22 @@ class Partition(val topic: String, assignedReplicaMap.remove(replicaId) } + def delete() { + // need to hold the lock to prevent appendMessagesToLeader() from hitting I/O exceptions due to log being deleted + leaderIsrUpdateLock synchronized { + assignedReplicaMap.clear() + inSyncReplicas = Set.empty[Replica] + leaderReplicaIdOpt = None + try { + logManager.deleteLog(TopicAndPartition(topic, partitionId)) + } catch { + case e: IOException => + fatal("Error deleting the log for partition [%s,%d]".format(topic, partitionId), e) + Runtime.getRuntime().halt(1) + } + } + } + def getLeaderEpoch(): Int = { leaderIsrUpdateLock synchronized { return this.leaderEpoch diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 1883a53..18f4649 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -525,8 +525,11 @@ class Log(val dir: File, * Completely delete this log directory and all contents from the file system with no delay */ def delete(): Unit = { - logSegments.foreach(_.delete()) - Utils.rm(dir) + lock synchronized { + logSegments.foreach(_.delete()) + segments.clear() + Utils.rm(dir) + } } /** diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 81be88a..874f977 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -50,7 +50,7 @@ class LogManager(val logDirs: Array[File], val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" val LockFile = ".lock" val InitialTaskDelayMs = 30*1000 - private val logCreationLock = new Object + private val logCreationOrDeletionLock = new Object private val logs = new Pool[TopicAndPartition, Log]() createAndValidateLogDirs(logDirs) @@ -229,7 +229,7 @@ class LogManager(val logDirs: Array[File], * If the log already exists, just return a copy of the existing log */ def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = { - logCreationLock synchronized { + logCreationOrDeletionLock synchronized { var log = logs.get(topicAndPartition) // check if the log has already been created in another thread @@ -254,7 +254,23 @@ class LogManager(val logDirs: Array[File], log } } - + + /** + * Delete a log. + */ + def deleteLog(topicAndPartition: TopicAndPartition) { + logCreationOrDeletionLock synchronized { + val removedLog = logs.remove(topicAndPartition) + if (removedLog != null) { + removedLog.delete() + info("Deleted log for partition [%s,%d] in %s." + .format(topicAndPartition.topic, + topicAndPartition.partition, + removedLog.dir.getAbsolutePath)) + } + } + } + /** * Choose the next directory in which to create a log. Currently this is done * by calculating the number of partitions in each directory and then choosing the diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 0d6926e..b323b0c 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -44,7 +44,14 @@ class LogSegment(val log: FileMessageSet, time: Time) extends Logging { var created = time.milliseconds - + /** + * A lock that synchronize the deletion and the flushing of the segment. + * Note that the synchronization btw the deletion and the append is already done at the + * ReplicaManager level. So append doesn't need to synchronize on this lock. + */ + private val lock = new Object + private var deleted = false + /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0 @@ -234,9 +241,13 @@ class LogSegment(val log: FileMessageSet, */ @threadsafe def flush() { - LogFlushStats.logFlushTimer.time { - log.flush() - index.flush() + lock synchronized { + if (!deleted) { + LogFlushStats.logFlushTimer.time { + log.flush() + index.flush() + } + } } } @@ -265,12 +276,15 @@ class LogSegment(val log: FileMessageSet, * @throws KafkaStorageException if the delete fails. */ def delete() { - val deletedLog = log.delete() - val deletedIndex = index.delete() - if(!deletedLog && log.file.exists) - throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.") - if(!deletedIndex && index.file.exists) - throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.") + lock synchronized { + val deletedLog = log.delete() + val deletedIndex = index.delete() + if(!deletedLog && log.file.exists) + throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.") + if(!deletedIndex && index.file.exists) + throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.") + deleted = true + } } /** diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 161f581..f183545 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -115,16 +115,16 @@ class ReplicaManager(val config: KafkaConfig, def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = { stateChangeLogger.trace("Broker %d handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId)) val errorCode = ErrorMapping.NoError - getReplica(topic, partitionId) match { - case Some(replica) => - /* TODO: handle deleteLog in a better way */ - //if (deletePartition) - // logManager.deleteLog(topic, partition) + getPartition(topic, partitionId) match { + case Some(partition) => leaderPartitionsLock synchronized { - leaderPartitions -= replica.partition + leaderPartitions -= partition + } + if(deletePartition) { + val removedPartition = allPartitions.remove((topic, partitionId)) + if (removedPartition != null) + removedPartition.delete() // this will delete the local log } - if(deletePartition) - allPartitions.remove((topic, partitionId)) case None => //do nothing if replica no longer exists } stateChangeLogger.trace("Broker %d finished handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId)) @@ -405,16 +405,19 @@ class ReplicaManager(val config: KafkaConfig, * Flushes the highwatermark value for all partitions to the highwatermark file */ def checkpointHighWatermarks() { - val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} - val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent) - for((dir, reps) <- replicasByDir) { - val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap - try { - highWatermarkCheckpoints(dir).write(hwms) - } catch { - case e: IOException => - fatal("Error writing to highwatermark file: ", e) - Runtime.getRuntime().halt(1) + // need to hold the lock since partitions can be deleted. + replicaStateChangeLock synchronized { + val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} + val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent) + for((dir, reps) <- replicasByDir) { + val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap + try { + highWatermarkCheckpoints(dir).write(hwms) + } catch { + case e: IOException => + fatal("Error writing to highwatermark file: ", e) + Runtime.getRuntime().halt(1) + } } } } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index c30069e..9bd7297 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -23,9 +23,10 @@ import java.util.Properties import kafka.utils._ import kafka.log._ import kafka.zk.ZooKeeperTestHarness -import kafka.server.KafkaConfig import kafka.utils.{Logging, ZkUtils, TestUtils} -import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition} +import kafka.common.{TopicExistsException, TopicAndPartition} +import java.io.File +import kafka.server.{KafkaServer, KafkaConfig} class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @@ -132,6 +133,16 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { } } + private def getPartitionDirCountInBrokers(servers: Iterable[KafkaServer], topic: String, partitionId: Int) = { + servers.foldLeft(0) { (total, server) => + val logFile = new File(server.config.logDirs.head, topic + "-" + partitionId) + if (logFile.exists()) + total + 1 + else + total + } + } + @Test def testPartitionReassignmentWithLeaderInNewReplicas() { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) @@ -154,6 +165,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) + assertTrue(TestUtils.waitUntilTrue(() => getPartitionDirCountInBrokers(servers, topic, 0) == 3, 5000)) servers.foreach(_.shutdown()) } @@ -179,7 +191,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { }, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas) - // leader should be 2 + assertTrue(TestUtils.waitUntilTrue(() => getPartitionDirCountInBrokers(servers, topic, 0) == 3, 5000)) servers.foreach(_.shutdown()) } @@ -205,7 +217,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { }, 2000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas) - // leader should be 2 + assertTrue(TestUtils.waitUntilTrue(() => getPartitionDirCountInBrokers(servers, topic, 0) == 2, 5000)) servers.foreach(_.shutdown()) } @@ -222,7 +234,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions()) val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient) assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition)) - // leader should be 2 servers.foreach(_.shutdown()) } @@ -244,6 +255,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1000) val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned) assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas) + assertTrue(TestUtils.waitUntilTrue(() => getPartitionDirCountInBrokers(servers, topic, 0) == 2, 5000)) servers.foreach(_.shutdown()) }