From 181e9e42453e0b4b23d7a7990b50d2f983a0b653 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Wed, 17 Dec 2014 17:14:29 -0800 Subject: [PATCH 1/3] first pass at log clean fix --- core/src/main/scala/kafka/log/LogCleaner.scala | 7 +++++++ core/src/main/scala/kafka/log/LogCleanerManager.scala | 11 ++++++++--- core/src/main/scala/kafka/log/LogManager.scala | 4 +++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index f8fcb84..1af98c8 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -130,6 +130,13 @@ class LogCleaner(val config: CleanerConfig, } /** + * Update checkpoint file, removing topics and partitions that no longer exist + */ + def updateCheckpoints(dataDir: File) { + cleanerManager.updateCheckpoints(dataDir,None); + } + + /** * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. * This call blocks until the cleaning of the partition is aborted and paused. */ diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index bcfef77..1d0f7dc 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -199,6 +199,13 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } } + def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { + val checkpoint = checkpoints(dataDir) + val existing = checkpoint.read().filterKeys(logs.keys) + val offsets = existing ++ update + checkpoint.write(existing) + } + /** * Save out the endOffset and remove the given log from the in-progress set, if not aborted. */ @@ -206,9 +213,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inLock(lock) { inProgress(topicAndPartition) match { case LogCleaningInProgress => - val checkpoint = checkpoints(dataDir) - val offsets = checkpoint.read() + ((topicAndPartition, endOffset)) - checkpoint.write(offsets) + updateCheckpoints(dataDir,Option(topicAndPartition, endOffset)) inProgress.remove(topicAndPartition) case LogCleaningAborted => inProgress.put(topicAndPartition, LogCleaningPaused) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4ebaae0..86fb202 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -370,8 +370,10 @@ class LogManager(val logDirs: Array[File], } if (removedLog != null) { //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. - if (cleaner != null) + if (cleaner != null) { cleaner.abortCleaning(topicAndPartition) + cleaner.updateCheckpoints(removedLog.dir.getParentFile) + } removedLog.delete() info("Deleted log for partition [%s,%d] in %s." .format(topicAndPartition.topic, -- 1.9.3 (Apple Git-50) From a9af42d11f71da7314887d8564f5cec302d8892c Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Thu, 18 Dec 2014 10:58:23 -0800 Subject: [PATCH 2/3] added locking --- core/src/main/scala/kafka/log/LogCleanerManager.scala | 10 ++++++---- core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala | 5 ++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 1d0f7dc..5e62b33 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -200,10 +200,12 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { - val checkpoint = checkpoints(dataDir) - val existing = checkpoint.read().filterKeys(logs.keys) - val offsets = existing ++ update - checkpoint.write(existing) + // lock is reentrant, so its safe to take it again in this context + inLock(lock) { + val checkpoint = checkpoints(dataDir) + val existing = checkpoint.read().filterKeys(logs.keys) ++ update + checkpoint.write(existing) + } } /** diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 29cc01b..39f9dcd 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -228,7 +228,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) val brokerConfigs = TestUtils.createBrokerConfigs(3, false) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + brokerConfigs.foreach(p => { + p.setProperty("delete.topic.enable", "true") + p.setProperty("log.cleaner.enable","true") + }) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic -- 1.9.3 (Apple Git-50) From b3c8f951a5e2779e5d74b3175574fd672eb30591 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Fri, 26 Dec 2014 13:55:58 -0800 Subject: [PATCH 3/3] improved tests per Joel and Neha's suggestions --- core/src/main/scala/kafka/log/LogCleaner.scala | 6 +++--- core/src/main/scala/kafka/log/LogCleanerManager.scala | 6 ++++-- core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala | 5 +---- .../scala/unit/kafka/log/LogCleanerIntegrationTest.scala | 14 ++++++++++++++ 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 1af98c8..f8e7cd5 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -71,8 +71,8 @@ class LogCleaner(val config: CleanerConfig, val logs: Pool[TopicAndPartition, Log], time: Time = SystemTime) extends Logging with KafkaMetricsGroup { - /* for managing the state of partitions being cleaned. */ - private val cleanerManager = new LogCleanerManager(logDirs, logs); + /* for managing the state of partitions being cleaned. package-private to allow access in tests */ + private[log] val cleanerManager = new LogCleanerManager(logDirs, logs); /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, @@ -133,7 +133,7 @@ class LogCleaner(val config: CleanerConfig, * Update checkpoint file, removing topics and partitions that no longer exist */ def updateCheckpoints(dataDir: File) { - cleanerManager.updateCheckpoints(dataDir,None); + cleanerManager.updateCheckpoints(dataDir, update=None); } /** diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 5e62b33..84b1571 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -44,9 +44,12 @@ private[log] case object LogCleaningPaused extends LogCleaningState private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup { override val loggerName = classOf[LogCleaner].getName + + // package-private for testing + val offsetCheckpointFile = "cleaner-offset-checkpoint" /* the offset checkpoints holding the last cleaned point for each log */ - private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap + private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap /* the set of logs currently being cleaned */ private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]() @@ -200,7 +203,6 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { - // lock is reentrant, so its safe to take it again in this context inLock(lock) { val checkpoint = checkpoints(dataDir) val existing = checkpoint.read().filterKeys(logs.keys) ++ update diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 39f9dcd..29cc01b 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -228,10 +228,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) val brokerConfigs = TestUtils.createBrokerConfigs(3, false) - brokerConfigs.foreach(p => { - p.setProperty("delete.topic.enable", "true") - p.setProperty("log.cleaner.enable","true") - }) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 5bfa764..07acd46 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -18,6 +18,8 @@ package kafka.log import java.io.File +import kafka.server.OffsetCheckpoint + import scala.collection._ import org.junit._ import kafka.common.TopicAndPartition @@ -62,6 +64,18 @@ class LogCleanerIntegrationTest extends JUnitSuite { cleaner.awaitCleaned("log", 0, lastCleaned2) val read2 = readFromLog(log) assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap) + + // simulate deleting a partition, by removing it from logs + // force a checkpoint + // and make sure its gone from checkpoint file + + cleaner.logs.remove(topics(0)) + + cleaner.updateCheckpoints(logDir) + val checkpoints = new OffsetCheckpoint(new File(logDir,cleaner.cleanerManager.offsetCheckpointFile)).read() + + // we expect partition 0 to be gone + assert(!checkpoints.contains(topics(0))) cleaner.shutdown() } -- 1.9.3 (Apple Git-50)