diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index c403770..ce8209c 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -80,6 +80,22 @@ private[log] object Log { nf.setGroupingUsed(false) nf.format(offset) + Log.FileSuffix } + + def getEmptyOffsets(request: OffsetRequest): Array[Long] = { + var size = 0 + request.time match { + case OffsetRequest.LatestTime => + size = 1 + case OffsetRequest.EarliestTime => + size = 1 + case _ => + if (request.time == SystemTime.milliseconds) + size = 1 + } + val ret = new Array[Long](size) + for (i <- 0 until size) ret(i) = 0 + ret + } } /** @@ -352,7 +368,7 @@ private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, } ret } - + def getTopicName():String = { name.substring(0, name.lastIndexOf("-")) } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index dd9fc78..c7cb3ba 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -25,6 +25,7 @@ import scala.collection._ import java.util.concurrent.CountDownLatch import kafka.server.{KafkaConfig, KafkaZooKeeper} import kafka.common.{InvalidTopicException, InvalidPartitionException} +import kafka.api.OffsetRequest /** * The guy who creates and hands out logs @@ -173,6 +174,21 @@ private[kafka] class LogManager(val config: KafkaConfig, random.nextInt(topicPartitionsMap.getOrElse(topic, numPartitions)) } + /** + * Get a Log or empty Log but do not create file + */ + def getLogOrGenerateFake(topic: String, partition: Int): Log = { + val log = getLog(topic, partition) + if (log != null) return log + new Log(null, 0, 0, false) + } + + def getOffsets(offsetRequest: OffsetRequest): Array[Long] = { + val log = getLog(offsetRequest.topic, offsetRequest.partition) + if (log != null) return log.getOffsetsBefore(offsetRequest) + Log.getEmptyOffsets(offsetRequest) + } + /** * Get the log if exists */ diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala b/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala index 64cdf05..9f76a02 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala @@ -130,8 +130,7 @@ private[kafka] class KafkaRequestHandlers(val logManager: LogManager) { val offsetRequest = OffsetRequest.readFrom(request.buffer) if(requestLogger.isTraceEnabled) requestLogger.trace("Offset request " + offsetRequest.toString) - val log = logManager.getOrCreateLog(offsetRequest.topic, offsetRequest.partition) - val offsets = log.getOffsetsBefore(offsetRequest) + val offsets = logManager.getOffsets(offsetRequest) val response = new OffsetArraySend(offsets) Some(response) } diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala index a6419a9..d95bf2e 100644 --- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala @@ -19,8 +19,7 @@ package kafka.log import junit.framework.TestCase import java.io.File -import kafka.utils.TestUtils -import kafka.utils.Utils +import kafka.utils._ import kafka.server.{KafkaConfig, KafkaServer} import junit.framework.Assert._ import java.util.{Random, Properties} @@ -30,6 +29,7 @@ import kafka.consumer.SimpleConsumer import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} +import org.apache.log4j._ object LogOffsetTest { val random = new Random() @@ -43,6 +43,8 @@ class LogOffsetTest extends JUnitSuite { val brokerPort: Int = 9099 var simpleConsumer: SimpleConsumer = null + private val logger = Logger.getLogger(classOf[LogOffsetTest]) + @Before def setUp() { val config: Properties = createBrokerConfig(1, brokerPort) @@ -66,21 +68,34 @@ class LogOffsetTest extends JUnitSuite { new FetchRequest("test", 0, 0, 300 * 1024)) assertFalse(messageSet.iterator.hasNext) + val name = "test" + val logFile = new File(logDir, name + "-0") + { - val offsets = simpleConsumer.getOffsetsBefore("test", 0, OffsetRequest.LatestTime, 10) + val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.LatestTime, 10) assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) ) + assertTrue(!logFile.exists()) } { - val offsets = simpleConsumer.getOffsetsBefore("test", 0, OffsetRequest.EarliestTime, 10) + val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.EarliestTime, 10) assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) ) + assertTrue(!logFile.exists()) } { - val offsets = simpleConsumer.getOffsetsBefore("test", 0, 1295978400000L, 10) - assertTrue( 0 == offsets.length ) + val offsets = simpleConsumer.getOffsetsBefore(name, 0, 1295978400000L, 10) + assertEquals( 0, offsets.length ) + assertTrue(!logFile.exists()) } + + { + val offsets = simpleConsumer.getOffsetsBefore(name, 0, SystemTime.milliseconds, 10) + assertEquals( 1, offsets.length ) + assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) ) + assertTrue(!logFile.exists()) + } } @Test