Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogManagerTest.scala (revision 1204274) +++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala (working copy) @@ -53,10 +53,20 @@ @Test def testCreateLog() { - val log = logManager.getOrCreateLog("kafka", 0) + val name = "kafka" + val log = logManager.getOrCreateLog(name, 0) + val logFile = new File(config.logDir, name + "-0") + assertTrue(logFile.exists) log.append(TestUtils.singleMessageSet("test".getBytes())) } + @Test + def testGetLog() { + val name = "kafka" + val log = logManager.getLog(name, 0) + val logFile = new File(config.logDir, name + "-0") + assertTrue(!logFile.exists) + } @Test def testCleanupExpiredSegments() { Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (revision 1204274) +++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (working copy) @@ -29,6 +29,7 @@ import kafka.serializer.StringDecoder import kafka.utils.TestUtils import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet} +import java.io.File /** * End to end tests of the primitive apis against a local server @@ -259,4 +260,12 @@ for((topic, resp) <- topics.zip(response.toList)) TestUtils.checkEquals(messages(topic).iterator, resp.iterator) } + + def testConsumerNotExistTopic() { + val newTopic = "new-topic" + val messageSetIter = consumer.fetch(new FetchRequest(newTopic, 0, 0, 10000)).iterator + assertTrue(messageSetIter.hasNext == false) + val logFile = new File(config.logDir, newTopic + "-0") + assertTrue(!logFile.exists) + } } Index: core/src/main/scala/kafka/log/LogManager.scala =================================================================== --- core/src/main/scala/kafka/log/LogManager.scala (revision 1204274) +++ core/src/main/scala/kafka/log/LogManager.scala (working copy) @@ -135,7 +135,7 @@ startupLatch.await } - def registerNewTopicInZK(topic: String) { + private def registerNewTopicInZK(topic: String) { if (config.enableZookeeper) zkActor ! topic } @@ -151,15 +151,10 @@ } } - - def chooseRandomPartition(topic: String): Int = { - random.nextInt(topicPartitionsMap.getOrElse(topic, numPartitions)) - } - /** - * Create the log if it does not exist, if it exists just return it + * Return the Pool (partitions) for a specific log */ - def getOrCreateLog(topic: String, partition: Int): Log = { + private def getLogPool(topic: String, partition: Int): Pool[Int, Log] = { awaitStartup if (topic.length <= 0) throw new InvalidTopicException("topic name can't be empty") @@ -168,8 +163,31 @@ (topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")") throw new InvalidPartitionException("wrong partition " + partition) } + logs.get(topic) + } + + /** + * Pick a random partition from the given topic + */ + def chooseRandomPartition(topic: String): Int = { + random.nextInt(topicPartitionsMap.getOrElse(topic, numPartitions)) + } + + /** + * Get the log if exists + */ + def getLog(topic: String, partition: Int): Log = { + val parts = getLogPool(topic, partition) + if (parts == null) return null + parts.get(partition) + } + + /** + * Create the log if it does not exist, if it exists just return it + */ + def getOrCreateLog(topic: String, partition: Int): Log = { var hasNewTopic = false - var parts = logs.get(topic) + var parts = getLogPool(topic, partition) if (parts == null) { val found = logs.putIfNotExists(topic, new Pool[Int, Log]) if (found == null) Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (revision 1204274) +++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (working copy) @@ -112,8 +112,11 @@ var response: MessageSetSend = null try { logger.trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition = " + fetchRequest.partition) - val log = logManager.getOrCreateLog(fetchRequest.topic, fetchRequest.partition) - response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize)) + val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition) + if (log != null) + response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize)) + else + response = new MessageSetSend() } catch { case e => Index: core/src/main/scala/kafka/server/MessageSetSend.scala =================================================================== --- core/src/main/scala/kafka/server/MessageSetSend.scala (revision 1204274) +++ core/src/main/scala/kafka/server/MessageSetSend.scala (working copy) @@ -42,6 +42,8 @@ def this(messages: MessageSet) = this(messages, ErrorMapping.NoError) + def this() = this(MessageSet.Empty) + def writeTo(channel: WritableByteChannel): Int = { expectIncomplete() var written = 0 Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1204274) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -194,7 +194,6 @@ // register on broker partition path changes val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topic - ZkUtils.makeSurePersistentPathExists(zkClient, partitionPath) zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) }