From f82518a85001307334132056fe32c28b452cf48b Mon Sep 17 00:00:00 2001 From: Joe Stein Date: Tue, 6 Jan 2015 22:40:27 -0500 Subject: [PATCH 1/8] KAFKA-1512 Fixes for limit the maximum number of connections per ip address patch by Jeff Holoman reviewed by Jay Krepps and Gwen Shapira --- .../main/scala/kafka/network/SocketServer.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 3 +- .../unit/kafka/network/SocketServerTest.scala | 45 ++++++++++++++++------ 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index e451592..39b1651 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -47,7 +47,7 @@ class SocketServer(val brokerId: Int, val maxRequestSize: Int = Int.MaxValue, val maxConnectionsPerIp: Int = Int.MaxValue, val connectionsMaxIdleMs: Long, - val maxConnectionsPerIpOverrides: Map[String, Int] = Map[String, Int]()) extends Logging with KafkaMetricsGroup { + val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup { this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 1bf7d10..1691ad7 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -94,7 +94,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.socketReceiveBufferBytes, config.socketRequestMaxBytes, config.maxConnectionsPerIp, - config.connectionsMaxIdleMs) + config.connectionsMaxIdleMs, + config.maxConnectionsPerIpOverrides) socketServer.startup() replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 5f4d852..78b431f 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -30,6 +30,7 @@ import kafka.common.TopicAndPartition import kafka.message.ByteBufferMessageSet import java.nio.channels.SelectionKey import kafka.utils.TestUtils +import scala.collection.Map class SocketServerTest extends JUnitSuite { @@ -42,7 +43,8 @@ class SocketServerTest extends JUnitSuite { recvBufferSize = 300000, maxRequestSize = 50, maxConnectionsPerIp = 5, - connectionsMaxIdleMs = 60*1000) + connectionsMaxIdleMs = 60*1000, + maxConnectionsPerIpOverrides = Map.empty[String,Int]) server.startup() def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { @@ -71,13 +73,12 @@ class SocketServerTest extends JUnitSuite { channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } - def connect() = new Socket("localhost", server.port) + def connect(s:SocketServer = server) = new Socket("localhost", s.port) @After def cleanup() { server.shutdown() } - @Test def simpleRequest() { val socket = connect() @@ -141,19 +142,39 @@ class SocketServerTest extends JUnitSuite { // doing a subsequent send should throw an exception as the connection should be closed. sendRequest(socket, 0, bytes) } - + @Test def testMaxConnectionsPerIp() { // make the maximum allowable number of connections and then leak them val conns = (0 until server.maxConnectionsPerIp).map(i => connect()) - // now try one more (should fail) - try { - val conn = connect() - sendRequest(conn, 100, "hello".getBytes) - assertEquals(-1, conn.getInputStream().read()) - } catch { - case e: IOException => // this is good - } + val conn = connect() + conn.setSoTimeout(3000) + assertEquals(-1, conn.getInputStream().read()) + } + + @Test + def testMaxConnectionsPerIPOverrides(): Unit = { + val overrideNum = 6 + val overrides: Map[String, Int] = Map("localhost" -> overrideNum) + val overrideServer: SocketServer = new SocketServer(0, + host = null, + port = kafka.utils.TestUtils.choosePort, + numProcessorThreads = 1, + maxQueuedRequests = 50, + sendBufferSize = 300000, + recvBufferSize = 300000, + maxRequestSize = 50, + maxConnectionsPerIp = 5, + connectionsMaxIdleMs = 60*1000, + maxConnectionsPerIpOverrides = overrides) + overrideServer.startup() + // make the maximum allowable number of connections and then leak them + val conns = ((0 until overrideNum).map(i => connect(overrideServer))) + // now try one more (should fail) + val conn = connect(overrideServer) + conn.setSoTimeout(3000) + assertEquals(-1, conn.getInputStream.read()) + overrideServer.shutdown() } } -- 1.9.3 (Apple Git-50) From a93ef199b2375c422e35d82ac7aa3a2fdacc1e74 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Fri, 9 Jan 2015 11:27:00 -0800 Subject: [PATCH 2/8] kafka-1797; (addressing Manikumar Reddy's comment) add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Manikumar Reddy and Neha Narkhede --- .../org/apache/kafka/common/serialization/StringDeserializer.java | 5 ++++- .../java/org/apache/kafka/common/serialization/StringSerializer.java | 5 ++++- .../org/apache/kafka/common/serialization/SerializationTest.java | 4 +++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java index a3b3700..9783ea0 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java @@ -37,7 +37,10 @@ public class StringDeserializer implements Deserializer { @Override public String deserialize(String topic, byte[] data) { try { - return new String(data, encoding); + if (data == null) + return null; + else + return new String(data, encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java index 02db47f..636d905 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java @@ -37,7 +37,10 @@ public class StringSerializer implements Serializer { @Override public byte[] serialize(String topic, String data) { try { - return data.getBytes(encoding); + if (data == null) + return null; + else + return data.getBytes(encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); } diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index d550a31..b6e1497 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -48,8 +48,10 @@ public class SerializationTest { assertEquals("Should get the original string after serialization and deserialization with encoding " + encoding, str, deserializer.deserialize(mytopic, serializer.serialize(mytopic, str))); - } + assertEquals("Should support null in serialization and deserialization with encoding " + encoding, + null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null))); + } } private SerDeser getStringSerDeser(String encoder) { -- 1.9.3 (Apple Git-50) From e52a6181bf0969f315ac0f0d325eac34d2b4a6ee Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Fri, 9 Jan 2015 11:33:48 -0800 Subject: [PATCH 3/8] kafka-1851; OffsetFetchRequest returns extra partitions when input only contains unknown partitions; patched by Jun Rao; reviewed by Neha Narkhede --- core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++++- core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala | 9 ++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2a1c032..c011a1b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -396,7 +396,11 @@ class KafkaApis(val requestChannel: RequestChannel, metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty ) val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap - val knownStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap + val knownStatus = + if (knownTopicPartitions.size > 0) + offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap + else + Map.empty[TopicAndPartition, OffsetMetadataAndError] val status = unknownStatus ++ knownStatus val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId) diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 8c5364f..4a3a5b2 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -79,7 +79,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { // create the topic createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server)) - val commitRequest = OffsetCommitRequest("test-group", immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) + val commitRequest = OffsetCommitRequest(group, immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) val commitResponse = simpleConsumer.commitOffsets(commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) @@ -109,6 +109,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata) assertEquals(100L, fetchResponse1.requestInfo.get(topicAndPartition).get.offset) + // Fetch an unknown topic and verify + val unknownTopicAndPartition = TopicAndPartition("unknownTopic", 0) + val fetchRequest2 = OffsetFetchRequest(group, Seq(unknownTopicAndPartition)) + val fetchResponse2 = simpleConsumer.fetchOffsets(fetchRequest2) + + assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get) + assertEquals(1, fetchResponse2.requestInfo.size) } @Test -- 1.9.3 (Apple Git-50) From 7ef4d25af0769394a847153efcc84df3ddfc625f Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Wed, 17 Dec 2014 17:14:29 -0800 Subject: [PATCH 4/8] first pass at log clean fix --- core/src/main/scala/kafka/log/LogCleaner.scala | 7 +++++++ core/src/main/scala/kafka/log/LogCleanerManager.scala | 11 ++++++++--- core/src/main/scala/kafka/log/LogManager.scala | 4 +++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index f8fcb84..1af98c8 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -130,6 +130,13 @@ class LogCleaner(val config: CleanerConfig, } /** + * Update checkpoint file, removing topics and partitions that no longer exist + */ + def updateCheckpoints(dataDir: File) { + cleanerManager.updateCheckpoints(dataDir,None); + } + + /** * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. * This call blocks until the cleaning of the partition is aborted and paused. */ diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index bcfef77..1d0f7dc 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -199,6 +199,13 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } } + def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { + val checkpoint = checkpoints(dataDir) + val existing = checkpoint.read().filterKeys(logs.keys) + val offsets = existing ++ update + checkpoint.write(existing) + } + /** * Save out the endOffset and remove the given log from the in-progress set, if not aborted. */ @@ -206,9 +213,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inLock(lock) { inProgress(topicAndPartition) match { case LogCleaningInProgress => - val checkpoint = checkpoints(dataDir) - val offsets = checkpoint.read() + ((topicAndPartition, endOffset)) - checkpoint.write(offsets) + updateCheckpoints(dataDir,Option(topicAndPartition, endOffset)) inProgress.remove(topicAndPartition) case LogCleaningAborted => inProgress.put(topicAndPartition, LogCleaningPaused) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 4ebaae0..86fb202 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -370,8 +370,10 @@ class LogManager(val logDirs: Array[File], } if (removedLog != null) { //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. - if (cleaner != null) + if (cleaner != null) { cleaner.abortCleaning(topicAndPartition) + cleaner.updateCheckpoints(removedLog.dir.getParentFile) + } removedLog.delete() info("Deleted log for partition [%s,%d] in %s." .format(topicAndPartition.topic, -- 1.9.3 (Apple Git-50) From 4120c4443614a7c8788074c9bf5e776329ca95c6 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Thu, 18 Dec 2014 10:58:23 -0800 Subject: [PATCH 5/8] added locking --- core/src/main/scala/kafka/log/LogCleanerManager.scala | 10 ++++++---- core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala | 5 ++++- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 1d0f7dc..5e62b33 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -200,10 +200,12 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { - val checkpoint = checkpoints(dataDir) - val existing = checkpoint.read().filterKeys(logs.keys) - val offsets = existing ++ update - checkpoint.write(existing) + // lock is reentrant, so its safe to take it again in this context + inLock(lock) { + val checkpoint = checkpoints(dataDir) + val existing = checkpoint.read().filterKeys(logs.keys) ++ update + checkpoint.write(existing) + } } /** diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 29cc01b..39f9dcd 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -228,7 +228,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) val brokerConfigs = TestUtils.createBrokerConfigs(3, false) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + brokerConfigs.foreach(p => { + p.setProperty("delete.topic.enable", "true") + p.setProperty("log.cleaner.enable","true") + }) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic -- 1.9.3 (Apple Git-50) From dbee1f38c96955a9244b9c71f9fcf67fb5e3e349 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Fri, 26 Dec 2014 13:55:58 -0800 Subject: [PATCH 6/8] improved tests per Joel and Neha's suggestions --- core/src/main/scala/kafka/log/LogCleaner.scala | 6 +++--- core/src/main/scala/kafka/log/LogCleanerManager.scala | 6 ++++-- core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala | 5 +---- .../scala/unit/kafka/log/LogCleanerIntegrationTest.scala | 14 ++++++++++++++ 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 1af98c8..f8e7cd5 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -71,8 +71,8 @@ class LogCleaner(val config: CleanerConfig, val logs: Pool[TopicAndPartition, Log], time: Time = SystemTime) extends Logging with KafkaMetricsGroup { - /* for managing the state of partitions being cleaned. */ - private val cleanerManager = new LogCleanerManager(logDirs, logs); + /* for managing the state of partitions being cleaned. package-private to allow access in tests */ + private[log] val cleanerManager = new LogCleanerManager(logDirs, logs); /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, @@ -133,7 +133,7 @@ class LogCleaner(val config: CleanerConfig, * Update checkpoint file, removing topics and partitions that no longer exist */ def updateCheckpoints(dataDir: File) { - cleanerManager.updateCheckpoints(dataDir,None); + cleanerManager.updateCheckpoints(dataDir, update=None); } /** diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 5e62b33..84b1571 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -44,9 +44,12 @@ private[log] case object LogCleaningPaused extends LogCleaningState private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup { override val loggerName = classOf[LogCleaner].getName + + // package-private for testing + val offsetCheckpointFile = "cleaner-offset-checkpoint" /* the offset checkpoints holding the last cleaned point for each log */ - private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, "cleaner-offset-checkpoint")))).toMap + private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap /* the set of logs currently being cleaned */ private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]() @@ -200,7 +203,6 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) { - // lock is reentrant, so its safe to take it again in this context inLock(lock) { val checkpoint = checkpoints(dataDir) val existing = checkpoint.read().filterKeys(logs.keys) ++ update diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 39f9dcd..29cc01b 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -228,10 +228,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) val brokerConfigs = TestUtils.createBrokerConfigs(3, false) - brokerConfigs.foreach(p => { - p.setProperty("delete.topic.enable", "true") - p.setProperty("log.cleaner.enable","true") - }) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 5bfa764..07acd46 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -18,6 +18,8 @@ package kafka.log import java.io.File +import kafka.server.OffsetCheckpoint + import scala.collection._ import org.junit._ import kafka.common.TopicAndPartition @@ -62,6 +64,18 @@ class LogCleanerIntegrationTest extends JUnitSuite { cleaner.awaitCleaned("log", 0, lastCleaned2) val read2 = readFromLog(log) assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap) + + // simulate deleting a partition, by removing it from logs + // force a checkpoint + // and make sure its gone from checkpoint file + + cleaner.logs.remove(topics(0)) + + cleaner.updateCheckpoints(logDir) + val checkpoints = new OffsetCheckpoint(new File(logDir,cleaner.cleanerManager.offsetCheckpointFile)).read() + + // we expect partition 0 to be gone + assert(!checkpoints.contains(topics(0))) cleaner.shutdown() } -- 1.9.3 (Apple Git-50) From c5d80f5e490bac79119c6152e1ce71496fa48d27 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Tue, 30 Dec 2014 16:00:43 -0800 Subject: [PATCH 7/8] added cleaner test to DeleteTopicTest --- .../scala/unit/kafka/admin/DeleteTopicTest.scala | 61 ++++++++++++++++++++-- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 29cc01b..6d75463 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -16,11 +16,14 @@ */ package kafka.admin +import java.io.File + +import kafka.log.Log import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import junit.framework.Assert._ import kafka.utils.{ZkUtils, TestUtils} -import kafka.server.{KafkaServer, KafkaConfig} +import kafka.server.{OffsetCheckpoint, KafkaServer, KafkaConfig} import org.junit.Test import kafka.common._ import kafka.producer.{ProducerConfig, Producer} @@ -221,14 +224,56 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000) assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined) servers.foreach(_.shutdown()) + } + + @Test + def testDeleteTopicWithCleaner() { + val topicAndPartition = TopicAndPartition("test", 0) + val topic = topicAndPartition.topic + + val brokerConfigs = TestUtils.createBrokerConfigs(3, false) + brokerConfigs(0).setProperty("delete.topic.enable", "true") + brokerConfigs(0).setProperty("log.cleaner.enable","true") + brokerConfigs(0).setProperty("log.cleanup.policy","compact") + brokerConfigs(0).setProperty("log.segment.bytes","100") + brokerConfigs(0).setProperty("log.segment.delete.delay.ms","1000") + val servers = createTestTopicAndCluster(topic,brokerConfigs) + + // for simplicity, we are validating cleaner offsets on a single broker + val server = servers(0) + val log = server.logManager.getLog(topicAndPartition).get + + // write to the topic to activate cleaner + writeDups(numKeys = 100, numDups = 3,log) + val lastCleaned = log.activeSegment.baseOffset + // wait for cleaner to clean + Thread.sleep(1000) + // delete topic + AdminUtils.deleteTopic(zkClient, "test") + verifyTopicDeletion("test", servers) + + // ensure that topic is removed from all cleaner offsets + TestUtils.waitUntilTrue(() => + { + val topicAndPartition = TopicAndPartition(topic,0) + val logdir = server.getLogManager().logDirs(0) + val checkpoints = new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read() + !checkpoints.contains(topicAndPartition) + }, "Cleaner offset for deleted partition should have been removed") } private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { + + val brokerConfigs = TestUtils.createBrokerConfigs(3, false) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true") + ) + createTestTopicAndCluster(topic,brokerConfigs) + } + + private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties]): Seq[KafkaServer] = { val expectedReplicaAssignment = Map(0 -> List(0, 1, 2)) val topicAndPartition = TopicAndPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(3, false) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) // create brokers val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b))) // create the topic @@ -254,4 +299,14 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { assertTrue("Replica logs not deleted after delete topic is complete", servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) } + + private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = { + var counter = 0 + for(dup <- 0 until numDups; key <- 0 until numKeys) yield { + val count = counter + log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes), assignOffsets = true) + counter += 1 + (key, count) + } + } } -- 1.9.3 (Apple Git-50) From 8f0578c6300d68427d6d42e66676bfd43c00c30c Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Mon, 12 Jan 2015 10:32:44 -0800 Subject: [PATCH 8/8] Fixes to DeleteTopicTest: clean up servers after cleaner test and move cleaner verification to the validation function --- .../test/scala/unit/kafka/admin/DeleteTopicTest.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index 6d75463..680184c 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -253,14 +253,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { AdminUtils.deleteTopic(zkClient, "test") verifyTopicDeletion("test", servers) - // ensure that topic is removed from all cleaner offsets - TestUtils.waitUntilTrue(() => - { - val topicAndPartition = TopicAndPartition(topic,0) - val logdir = server.getLogManager().logDirs(0) - val checkpoints = new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read() - !checkpoints.contains(topicAndPartition) - }, "Cleaner offset for deleted partition should have been removed") + servers.foreach(_.shutdown()) } private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = { @@ -298,6 +291,15 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness { // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper assertTrue("Replica logs not deleted after delete topic is complete", servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) + // ensure that topic is removed from all cleaner offsets + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res,server) => res && + { + val topicAndPartition = TopicAndPartition(topic,0) + val logdir = server.getLogManager().logDirs(0) + val checkpoints = new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read() + !checkpoints.contains(topicAndPartition) + }), + "Cleaner offset for deleted partition should have been removed") } private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = { -- 1.9.3 (Apple Git-50)