Index: core/src/test/scala/unit/kafka/log/LogOffsetTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (revision 1204274) +++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (working copy) @@ -17,10 +17,8 @@ package kafka.log -import junit.framework.TestCase import java.io.File -import kafka.utils.TestUtils -import kafka.utils.Utils +import kafka.utils._ import kafka.server.{KafkaConfig, KafkaServer} import junit.framework.Assert._ import java.util.{Random, Properties} @@ -30,6 +28,7 @@ import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} +import org.apache.log4j._ object LogOffsetTest { val random = new Random() @@ -43,6 +42,8 @@ val brokerPort: Int = 9099 var simpleConsumer: SimpleConsumer = null + private val logger = Logger.getLogger(classOf[LogOffsetTest]) + @Before def setUp() { val config: Properties = createBrokerConfig(1, brokerPort) @@ -66,21 +67,26 @@ new FetchRequest("test", 0, 0, 300 * 1024)) assertFalse(messageSet.iterator.hasNext) + val name = "test" + val logFile = new File(logDir, name + "-0") + { - val offsets = simpleConsumer.getOffsetsBefore("test", 0, OffsetRequest.LatestTime, 10) + val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.LatestTime, 10) assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) ) + assertTrue(!logFile.exists()) } { - val offsets = simpleConsumer.getOffsetsBefore("test", 0, OffsetRequest.EarliestTime, 10) + val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.EarliestTime, 10) assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) ) + assertTrue(!logFile.exists()) } { - val offsets = simpleConsumer.getOffsetsBefore("test", 0, 1295978400000L, 10) - assertTrue( 0 == offsets.length ) + val offsets = simpleConsumer.getOffsetsBefore(name, 0, SystemTime.milliseconds, 10) + assertEquals( 0, offsets.length ) + assertTrue(!logFile.exists()) } - } @Test Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogManagerTest.scala (revision 1204274) +++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala (working copy) @@ -53,10 +53,20 @@ @Test def testCreateLog() { - val log = logManager.getOrCreateLog("kafka", 0) + val name = "kafka" + val log = logManager.getOrCreateLog(name, 0) + val logFile = new File(config.logDir, name + "-0") + assertTrue(logFile.exists) log.append(TestUtils.singleMessageSet("test".getBytes())) } + @Test + def testGetLog() { + val name = "kafka" + val log = logManager.getLog(name, 0) + val logFile = new File(config.logDir, name + "-0") + assertTrue(!logFile.exists) + } @Test def testCleanupExpiredSegments() { Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (revision 1204274) +++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (working copy) @@ -29,6 +29,7 @@ import kafka.serializer.StringDecoder import kafka.utils.TestUtils import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet} +import java.io.File /** * End to end tests of the primitive apis against a local server @@ -259,4 +260,12 @@ for((topic, resp) <- topics.zip(response.toList)) TestUtils.checkEquals(messages(topic).iterator, resp.iterator) } + + def testConsumerNotExistTopic() { + val newTopic = "new-topic" + val messageSetIter = consumer.fetch(new FetchRequest(newTopic, 0, 0, 10000)).iterator + assertTrue(messageSetIter.hasNext == false) + val logFile = new File(config.logDir, newTopic + "-0") + assertTrue(!logFile.exists) + } } Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1204274) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -17,11 +17,9 @@ package kafka.log -import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic._ import java.text.NumberFormat import java.io._ -import java.nio.channels.FileChannel import org.apache.log4j._ import kafka.message._ import kafka.utils._ @@ -80,6 +78,13 @@ nf.setGroupingUsed(false) nf.format(offset) + Log.FileSuffix } + + def getEmptyOffsets(request: OffsetRequest): Array[Long] = { + if (request.time == OffsetRequest.LatestTime || request.time == OffsetRequest.EarliestTime) + return Array(0L) + else + return Array() + } } /** @@ -352,7 +357,7 @@ } ret } - + def getTopicName():String = { name.substring(0, name.lastIndexOf("-")) } Index: core/src/main/scala/kafka/log/LogManager.scala =================================================================== --- core/src/main/scala/kafka/log/LogManager.scala (revision 1204274) +++ core/src/main/scala/kafka/log/LogManager.scala (working copy) @@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLatch import kafka.server.{KafkaConfig, KafkaZooKeeper} import kafka.common.{InvalidTopicException, InvalidPartitionException} +import kafka.api.OffsetRequest /** * The guy who creates and hands out logs @@ -135,7 +136,7 @@ startupLatch.await } - def registerNewTopicInZK(topic: String) { + private def registerNewTopicInZK(topic: String) { if (config.enableZookeeper) zkActor ! topic } @@ -151,15 +152,10 @@ } } - - def chooseRandomPartition(topic: String): Int = { - random.nextInt(topicPartitionsMap.getOrElse(topic, numPartitions)) - } - /** - * Create the log if it does not exist, if it exists just return it + * Return the Pool (partitions) for a specific log */ - def getOrCreateLog(topic: String, partition: Int): Log = { + private def getLogPool(topic: String, partition: Int): Pool[Int, Log] = { awaitStartup if (topic.length <= 0) throw new InvalidTopicException("topic name can't be empty") @@ -168,8 +164,37 @@ (topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")") throw new InvalidPartitionException("wrong partition " + partition) } + logs.get(topic) + } + + /** + * Pick a random partition from the given topic + */ + def chooseRandomPartition(topic: String): Int = { + random.nextInt(topicPartitionsMap.getOrElse(topic, numPartitions)) + } + + def getOffsets(offsetRequest: OffsetRequest): Array[Long] = { + val log = getLog(offsetRequest.topic, offsetRequest.partition) + if (log != null) return log.getOffsetsBefore(offsetRequest) + Log.getEmptyOffsets(offsetRequest) + } + + /** + * Get the log if exists + */ + def getLog(topic: String, partition: Int): Log = { + val parts = getLogPool(topic, partition) + if (parts == null) return null + parts.get(partition) + } + + /** + * Create the log if it does not exist, if it exists just return it + */ + def getOrCreateLog(topic: String, partition: Int): Log = { var hasNewTopic = false - var parts = logs.get(topic) + var parts = getLogPool(topic, partition) if (parts == null) { val found = logs.putIfNotExists(topic, new Pool[Int, Log]) if (found == null) Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (revision 1204274) +++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (working copy) @@ -17,17 +17,13 @@ package kafka.server -import java.nio.channels._ import org.apache.log4j.Logger -import kafka.producer._ -import kafka.consumer._ import kafka.log._ import kafka.network._ import kafka.message._ -import kafka.server._ import kafka.api._ import kafka.common.ErrorMapping -import kafka.utils.{Utils, SystemTime} +import kafka.utils.SystemTime import java.io.IOException /** @@ -112,8 +108,11 @@ var response: MessageSetSend = null try { logger.trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition = " + fetchRequest.partition) - val log = logManager.getOrCreateLog(fetchRequest.topic, fetchRequest.partition) - response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize)) + val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition) + if (log != null) + response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize)) + else + response = new MessageSetSend() } catch { case e => @@ -127,8 +126,7 @@ val offsetRequest = OffsetRequest.readFrom(request.buffer) if(requestLogger.isTraceEnabled) requestLogger.trace("Offset request " + offsetRequest.toString) - val log = logManager.getOrCreateLog(offsetRequest.topic, offsetRequest.partition) - val offsets = log.getOffsetsBefore(offsetRequest) + val offsets = logManager.getOffsets(offsetRequest) val response = new OffsetArraySend(offsets) Some(response) } Index: core/src/main/scala/kafka/server/MessageSetSend.scala =================================================================== --- core/src/main/scala/kafka/server/MessageSetSend.scala (revision 1204274) +++ core/src/main/scala/kafka/server/MessageSetSend.scala (working copy) @@ -42,6 +42,8 @@ def this(messages: MessageSet) = this(messages, ErrorMapping.NoError) + def this() = this(MessageSet.Empty) + def writeTo(channel: WritableByteChannel): Int = { expectIncomplete() var written = 0 Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1204274) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -194,7 +194,6 @@ // register on broker partition path changes val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topic - ZkUtils.makeSurePersistentPathExists(zkClient, partitionPath) zkClient.subscribeChildChanges(partitionPath, loadBalancerListener) }