From 7ef4d25af0769394a847153efcc84df3ddfc625f Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Wed, 17 Dec 2014 17:14:29 -0800 Subject: [PATCH 1/6] first pass at log clean fix --- core/src/main/scala/kafka/log/LogCleaner.scala | 7 +++++++ core/src/main/scala/kafka/log/LogCleanerManager.scala | 11 ++++++++--- core/src/main/scala/kafka/log/LogManager.scala | 4 +++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index f8fcb84..1af98c8 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -130,6 +130,13 @@ class LogCleaner(val config: CleanerConfig, } /** + * Update checkpoint file, removing topics and partitions that no longer exist + */ + def updateCheckpoints(dataDir: File) { + cleanerManager.updateCheckpoints(dataDir,None); + } + + /** * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. * This call blocks until the cleaning of the partition is aborted and paused. */ diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index bcfef77..1d0f7dc 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -199,6 +199,13 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } } + def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { + val checkpoint = checkpoints(dataDir) + val existing = checkpoint.read().filterKeys(logs.keys) + val offsets = existing ++ update + checkpoint.write(existing) + } + /** * Save out the endOffset and remove the given log from the in-progress set, if not aborted. */ @@ -206,9 +213,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inLock(lock) { inProgress(topicAndPartition) match { case LogCleaningInProgress => - val checkpoint = checkpoints(dataDir) - val offsets = checkpoint.read() + ((topicAndPartition, endOffset)) - checkpoint.write(offsets) + updateCheckpoints(dataDir,Option(topicAndPartition, endOffset)) inProgress.remove(topicAndPartition) case LogCleaningAborted => inProgress.put(topicAndPartition, LogCleaningPaused) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4ebaae0..86fb202 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -370,8 +370,10 @@ class LogManager(val logDirs: Array[File], } if (removedLog != null) { //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. - if (cleaner != null) + if (cleaner != null) { cleaner.abortCleaning(topicAndPartition) + cleaner.updateCheckpoints(removedLog.dir.getParentFile) + } removedLog.delete() info("Deleted log for partition [%s,%d] in %s." .format(topicAndPartition.topic, -- 1.9.3 (Apple Git-50) From 4120c4443614a7c8788074c9bf5e776329ca95c6 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Thu, 18 Dec 2014 10:58:23 -0800 Subject: [PATCH 2/6] added locking --- core/src/main/scala/kafka/log/LogCleanerManager.scala | 10 ++++++---- core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala | 5 ++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 1d0f7dc..5e62b33 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -200,10 +200,12 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { - val checkpoint = checkpoints(dataDir) - val existing = checkpoint.read().filterKeys(logs.keys) - val offsets = existing ++ update - checkpoint.write(existing) + // lock is reentrant, so its safe to take it again in this context + inLock(lock) { + val checkpoint = checkpoints(dataDir) + val existing = checkpoint.read().filterKeys(logs.keys) ++ update + checkpoint.write(existing) + } } /** diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 29cc01b..39f9dcd 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -228,7 +228,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) val brokerConfigs = TestUtils.createBrokerConfigs(3, false) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + brokerConfigs.foreach(p => { + p.setProperty("delete.topic.enable", "true") + p.setProperty("log.cleaner.enable","true") + }) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic -- 1.9.3 (Apple Git-50) From dbee1f38c96955a9244b9c71f9fcf67fb5e3e349 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Fri, 26 Dec 2014 13:55:58 -0800 Subject: [PATCH 3/6] improved tests per Joel and Neha's suggestions --- core/src/main/scala/kafka/log/LogCleaner.scala | 6 +++--- core/src/main/scala/kafka/log/LogCleanerManager.scala | 6 ++++-- core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala | 5 +---- .../scala/unit/kafka/log/LogCleanerIntegrationTest.scala | 14 ++++++++++++++ 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 1af98c8..f8e7cd5 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -71,8 +71,8 @@ class LogCleaner(val config: CleanerConfig, val logs: Pool[TopicAndPartition, Log], time: Time = SystemTime) extends Logging with KafkaMetricsGroup { - /* for managing the state of partitions being cleaned. */ - private val cleanerManager = new LogCleanerManager(logDirs, logs); + /* for managing the state of partitions being cleaned. package-private to allow access in tests */ + private[log] val cleanerManager = new LogCleanerManager(logDirs, logs); /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, @@ -133,7 +133,7 @@ class LogCleaner(val config: CleanerConfig, * Update checkpoint file, removing topics and partitions that no longer exist */ def updateCheckpoints(dataDir: File) { - cleanerManager.updateCheckpoints(dataDir,None); + cleanerManager.updateCheckpoints(dataDir, update=None); } /** diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 5e62b33..84b1571 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -44,9 +44,12 @@ private[log] case object LogCleaningPaused extends LogCleaningState private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup { override val loggerName = classOf[LogCleaner].getName + + // package-private for testing + val offsetCheckpointFile = "cleaner-offset-checkpoint" /* the offset checkpoints holding the last cleaned point for each log */ - private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap + private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap /* the set of logs currently being cleaned */ private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]() @@ -200,7 +203,6 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { - // lock is reentrant, so its safe to take it again in this context inLock(lock) { val checkpoint = checkpoints(dataDir) val existing = checkpoint.read().filterKeys(logs.keys) ++ update diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 39f9dcd..29cc01b 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -228,10 +228,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) val brokerConfigs = TestUtils.createBrokerConfigs(3, false) - brokerConfigs.foreach(p => { - p.setProperty("delete.topic.enable", "true") - p.setProperty("log.cleaner.enable","true") - }) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 5bfa764..07acd46 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -18,6 +18,8 @@ package kafka.log import java.io.File +import kafka.server.OffsetCheckpoint + import scala.collection._ import org.junit._ import kafka.common.TopicAndPartition @@ -62,6 +64,18 @@ class LogCleanerIntegrationTest extends JUnitSuite { cleaner.awaitCleaned("log", 0, lastCleaned2) val read2 = readFromLog(log) assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap) + + // simulate deleting a partition, by removing it from logs + // force a checkpoint + // and make sure its gone from checkpoint file + + cleaner.logs.remove(topics(0)) + + cleaner.updateCheckpoints(logDir) + val checkpoints = new OffsetCheckpoint(new File(logDir,cleaner.cleanerManager.offsetCheckpointFile)).read() + + // we expect partition 0 to be gone + assert(!checkpoints.contains(topics(0))) cleaner.shutdown() } -- 1.9.3 (Apple Git-50) From c5d80f5e490bac79119c6152e1ce71496fa48d27 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 30 Dec 2014 16:00:43 -0800 Subject: [PATCH 4/6] added cleaner test to DeleteTopicTest --- .../scala/unit/kafka/admin/DeleteTopicTest.scala | 61 ++++++++++++++++++++-- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 29cc01b..6d75463 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -16,11 +16,14 @@ */ package kafka.admin +import java.io.File + +import kafka.log.Log import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import junit.framework.Assert._ import kafka.utils.{ZkUtils, TestUtils} -import kafka.server.{KafkaServer, KafkaConfig} +import kafka.server.{OffsetCheckpoint, KafkaServer, KafkaConfig} import org.junit.Test import kafka.common._ import kafka.producer.{ProducerConfig, Producer} @@ -221,14 +224,56 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) servers.foreach(_.shutdown()) + } + + @Test + def testDeleteTopicWithCleaner() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + + val brokerConfigs = TestUtils.createBrokerConfigs(3, false) + brokerConfigs(0).setProperty("delete.topic.enable", "true") + brokerConfigs(0).setProperty("log.cleaner.enable","true") + brokerConfigs(0).setProperty("log.cleanup.policy","compact") + brokerConfigs(0).setProperty("log.segment.bytes","100") + brokerConfigs(0).setProperty("log.segment.delete.delay.ms","1000") + val servers = createTestTopicAndCluster(topic,brokerConfigs) + + // for simplicity, we are validating cleaner offsets on a single broker + val server = servers(0) + val log = server.logManager.getLog(topicAndPartition).get + + // write to the topic to activate cleaner + writeDups(numKeys = 100, numDups = 3,log) + val lastCleaned = log.activeSegment.baseOffset + // wait for cleaner to clean + Thread.sleep(1000) + // delete topic + AdminUtils.deleteTopic(zkClient, "test") + verifyTopicDeletion("test", servers) + + // ensure that topic is removed from all cleaner offsets + TestUtils.waitUntilTrue(() => + { + val topicAndPartition = TopicAndPartition(topic,0) + val logdir = server.getLogManager().logDirs(0) + val checkpoints = new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read() + !checkpoints.contains(topicAndPartition) + }, "Cleaner offset for deleted partition should have been removed") } private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { + + val brokerConfigs = TestUtils.createBrokerConfigs(3, false) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true") + ) + createTestTopicAndCluster(topic,brokerConfigs) + } + + private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties]): Seq[KafkaServer] = { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(3, false) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic @@ -254,4 +299,14 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { assertTrue("Replica logs not deleted after delete topic is complete", servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) } + + private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = { + var counter = 0 + for(dup <- 0 until numDups; key <- 0 until numKeys) yield { + val count = counter + log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true) + counter += 1 + (key, count) + } + } } -- 1.9.3 (Apple Git-50) From 8f0578c6300d68427d6d42e66676bfd43c00c30c Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Mon, 12 Jan 2015 10:32:44 -0800 Subject: [PATCH 5/6] Fixes to DeleteTopicTest: clean up servers after cleaner test and move cleaner verification to the validation function --- .../test/scala/unit/kafka/admin/DeleteTopicTest.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 6d75463..680184c 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -253,14 +253,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.deleteTopic(zkClient, "test") verifyTopicDeletion("test", servers) - // ensure that topic is removed from all cleaner offsets - TestUtils.waitUntilTrue(() => - { - val topicAndPartition = TopicAndPartition(topic,0) - val logdir = server.getLogManager().logDirs(0) - val checkpoints = new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read() - !checkpoints.contains(topicAndPartition) - }, "Cleaner offset for deleted partition should have been removed") + servers.foreach(_.shutdown()) } private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { @@ -298,6 +291,15 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper assertTrue("Replica logs not deleted after delete topic is complete", servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) + // ensure that topic is removed from all cleaner offsets + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res,server) => res && + { + val topicAndPartition = TopicAndPartition(topic,0) + val logdir = server.getLogManager().logDirs(0) + val checkpoints = new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read() + !checkpoints.contains(topicAndPartition) + }), + "Cleaner offset for deleted partition should have been removed") } private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = { -- 1.9.3 (Apple Git-50) From 4e93c7da6eae43b67e3b1bf288665cf0419bbb56 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Mon, 12 Jan 2015 17:01:42 -0800 Subject: [PATCH 6/6] minor fixes suggested by Joel --- core/src/main/scala/kafka/log/LogCleanerManager.scala | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 5 +++-- core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala | 7 ++++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 84b1571..fd87d90 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -46,7 +46,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To override val loggerName = classOf[LogCleaner].getName // package-private for testing - val offsetCheckpointFile = "cleaner-offset-checkpoint" + private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint" /* the offset checkpoints holding the last cleaned point for each log */ private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 86fb202..47d250a 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -57,8 +57,9 @@ class LogManager(val logDirs: Array[File], private val dirLocks = lockLogDirs(logDirs) private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap loadLogs() - - private val cleaner: LogCleaner = + + // public, so we can access this from kafka.admin.DeleteTopicTest + val cleaner: LogCleaner = if(cleanerConfig.enableCleaner) new LogCleaner(cleanerConfig, logDirs, logs, time = time) else diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 680184c..33c2767 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -228,7 +228,8 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testDeleteTopicWithCleaner() { - val topicAndPartition = TopicAndPartition("test", 0) + val topicName = "test" + val topicAndPartition = TopicAndPartition(topicName, 0) val topic = topicAndPartition.topic val brokerConfigs = TestUtils.createBrokerConfigs(3, false) @@ -245,10 +246,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // write to the topic to activate cleaner writeDups(numKeys = 100, numDups = 3,log) - val lastCleaned = log.activeSegment.baseOffset // wait for cleaner to clean - Thread.sleep(1000) + server.logManager.cleaner.awaitCleaned(topicName,0,0) + // delete topic AdminUtils.deleteTopic(zkClient, "test") verifyTopicDeletion("test", servers) -- 1.9.3 (Apple Git-50)