From 305f9e0e02eca942af9e8e17d001002e8f9ce19a Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Wed, 17 Dec 2014 17:14:29 -0800 Subject: [PATCH 1/2] first pass at log clean fix --- core/src/main/scala/kafka/log/LogCleaner.scala | 7 +++++++ core/src/main/scala/kafka/log/LogCleanerManager.scala | 11 ++++++++--- core/src/main/scala/kafka/log/LogManager.scala | 4 +++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index f8fcb84..1af98c8 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -130,6 +130,13 @@ class LogCleaner(val config: CleanerConfig, } /** + * Update checkpoint file, removing topics and partitions that no longer exist + */ + def updateCheckpoints(dataDir: File) { + cleanerManager.updateCheckpoints(dataDir,None); + } + + /** * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. * This call blocks until the cleaning of the partition is aborted and paused. */ diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index bcfef77..1d0f7dc 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -199,6 +199,13 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } } + def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { + val checkpoint = checkpoints(dataDir) + val existing = checkpoint.read().filterKeys(logs.keys) + val offsets = existing ++ update + checkpoint.write(existing) + } + /** * Save out the endOffset and remove the given log from the in-progress set, if not aborted. */ @@ -206,9 +213,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inLock(lock) { inProgress(topicAndPartition) match { case LogCleaningInProgress => - val checkpoint = checkpoints(dataDir) - val offsets = checkpoint.read() + ((topicAndPartition, endOffset)) - checkpoint.write(offsets) + updateCheckpoints(dataDir,Option(topicAndPartition, endOffset)) inProgress.remove(topicAndPartition) case LogCleaningAborted => inProgress.put(topicAndPartition, LogCleaningPaused) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4d2924d..f4574da 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -370,8 +370,10 @@ class LogManager(val logDirs: Array[File], } if (removedLog != null) { //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. - if (cleaner != null) + if (cleaner != null) { cleaner.abortCleaning(topicAndPartition) + cleaner.updateCheckpoints(removedLog.dir.getParentFile) + } removedLog.delete() info("Deleted log for partition [%s,%d] in %s." .format(topicAndPartition.topic, -- 1.9.3 (Apple Git-50) From 5c1bd54b488ef2d104f7e3cd12683dffc4d47f47 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Thu, 18 Dec 2014 10:58:23 -0800 Subject: [PATCH 2/2] added locking --- core/src/main/scala/kafka/log/LogCleanerManager.scala | 10 ++++++---- core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala | 5 ++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 1d0f7dc..5e62b33 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -200,10 +200,12 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { - val checkpoint = checkpoints(dataDir) - val existing = checkpoint.read().filterKeys(logs.keys) - val offsets = existing ++ update - checkpoint.write(existing) + // lock is reentrant, so its safe to take it again in this context + inLock(lock) { + val checkpoint = checkpoints(dataDir) + val existing = checkpoint.read().filterKeys(logs.keys) ++ update + checkpoint.write(existing) + } } /** diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 29cc01b..39f9dcd 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -228,7 +228,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) val brokerConfigs = TestUtils.createBrokerConfigs(3, false) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + brokerConfigs.foreach(p => { + p.setProperty("delete.topic.enable", "true") + p.setProperty("log.cleaner.enable","true") + }) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic -- 1.9.3 (Apple Git-50)