diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0cc402b..8cd489e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -60,7 +60,7 @@ class Log(val dir: File, private val lastflushedTime = new AtomicLong(time.milliseconds) /* the actual segments of the log */ - private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] + private val segments: ConcurrentNavigableMap[Long,LogSegment] = new ConcurrentSkipListMap[Long, LogSegment] loadSegments() /* The number of times the log has been truncated */ @@ -170,7 +170,7 @@ class Log(val dir: File, this.recoveryPoint = lastOffset return } - val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator + val unflushed = logSegments(segments.floorKey(this.recoveryPoint), Long.MaxValue).iterator while(unflushed.hasNext) { val curr = unflushed.next info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name)) @@ -597,11 +597,11 @@ class Log(val dir: File, def logSegments(from: Long, to: Long): Iterable[LogSegment] = { import JavaConversions._ lock synchronized { - val floor = segments.floorKey(from) + val floor: java.lang.Long = segments.floorKey(from) if(floor eq null) - segments.headMap(to).values + asIterable(segments.headMap(to).values) else - segments.subMap(floor, true, to, false).values + asIterable(segments.subMap(floor.longValue, true, to, false).values) } } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 77d7ec0..e81a8dd 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -42,9 +42,10 @@ object RequestChannel extends Logging { } case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) { - @volatile var dequeueTimeMs = -1L + @volatile var requestDequeueTimeMs = -1L @volatile var apiLocalCompleteTimeMs = -1L @volatile var responseCompleteTimeMs = -1L + @volatile var responseDequeueTimeMs = -1L val requestId = buffer.getShort() val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) buffer = null @@ -57,10 +58,11 @@ object RequestChannel extends Logging { // processing time is really small. In this case, use responseCompleteTimeMs as apiLocalCompleteTimeMs. if (apiLocalCompleteTimeMs < 0) apiLocalCompleteTimeMs = responseCompleteTimeMs - val queueTime = (dequeueTimeMs - startTimeMs).max(0L) - val apiLocalTime = (apiLocalCompleteTimeMs - dequeueTimeMs).max(0L) + val requestQueueTime = (requestDequeueTimeMs - startTimeMs).max(0L) + val apiLocalTime = (apiLocalCompleteTimeMs - requestDequeueTimeMs).max(0L) val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L) - val responseSendTime = (endTimeMs - responseCompleteTimeMs).max(0L) + val responseQueueTime = (responseDequeueTimeMs - responseCompleteTimeMs).max(0L) + val responseSendTime = (endTimeMs - responseDequeueTimeMs).max(0L) val totalTime = endTimeMs - startTimeMs var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId))) if (requestId == RequestKeys.FetchKey) { @@ -72,15 +74,16 @@ object RequestChannel extends Logging { } metricsList.foreach{ m => m.requestRate.mark() - m.queueTimeHist.update(queueTime) + m.requestQueueTimeHist.update(requestQueueTime) m.localTimeHist.update(apiLocalTime) m.remoteTimeHist.update(apiRemoteTime) + m.responseQueueTimeHist.update(responseQueueTime) m.responseSendTimeHist.update(responseSendTime) m.totalTimeHist.update(totalTime) } if(requestLogger.isTraceEnabled) - requestLogger.trace("Completed request:%s from client %s;totalTime:%d,queueTime:%d,localTime:%d,remoteTime:%d,sendTime:%d" - .format(requestObj, remoteAddress, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime)) + requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" + .format(requestObj, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) } } @@ -154,8 +157,11 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe requestQueue.take() /** Get a response for the given processor if there is one */ - def receiveResponse(processor: Int): RequestChannel.Response = - responseQueues(processor).poll() + def receiveResponse(processor: Int): RequestChannel.Response = { + val response = responseQueues(processor).poll() + response.request.responseDequeueTimeMs = SystemTime.milliseconds + response + } def addResponseListener(onResponse: Int => Unit) { responseListeners ::= onResponse @@ -177,11 +183,13 @@ object RequestMetrics { class RequestMetrics(name: String) extends KafkaMetricsGroup { val requestRate = newMeter(name + "-RequestsPerSec", "requests", TimeUnit.SECONDS) // time a request spent in a request queue - val queueTimeHist = newHistogram(name + "-QueueTimeMs") + val requestQueueTimeHist = newHistogram(name + "-RequestQueueTimeMs") // time a request takes to be processed at the local broker val localTimeHist = newHistogram(name + "-LocalTimeMs") // time a request takes to wait on remote brokers (only relevant to fetch and produce requests) val remoteTimeHist = newHistogram(name + "-RemoteTimeMs") + // time a response spent in a response queue + val responseQueueTimeHist = newHistogram(name + "-ResponseQueueTimeMs") // time to send the response to the requester val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs") val totalTimeHist = newHistogram(name + "-TotalTimeMs") diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 74442b6..ebd171f 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -31,17 +31,6 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro this(new VerifiableProperties(originalProps)) props.verify() } - - private def getLogRetentionTimeMillis(): Long = { - var millisInMinute = 60L * 1000L - val millisInHour = 60L * millisInMinute - if(props.containsKey("log.retention.minutes")){ - millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue)) - } else { - millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) - } - - } /*********** General Configuration ***********/ @@ -103,7 +92,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val logRollHoursPerTopicMap = props.getMap("log.roll.hours.per.topic", _.toInt > 0).mapValues(_.toInt) /* the number of hours to keep a log file before deleting it */ - val logRetentionTimeMillis = getLogRetentionTimeMillis + val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) /* the number of hours to keep a log file before deleting it for some specific topic*/ val logRetentionHoursPerTopicMap = props.getMap("log.retention.hours.per.topic", _.toInt > 0).mapValues(_.toInt) diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 6d562c2..d0f05cb 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -37,7 +37,7 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha id, brokerId)) return } - req.dequeueTimeMs = SystemTime.milliseconds + req.requestDequeueTimeMs = SystemTime.milliseconds trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) apis.handle(req) } catch { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5e35a89..c148fdf 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -253,7 +253,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg flushInterval = config.logFlushIntervalMessages, flushMs = config.logFlushIntervalMs.toLong, retentionSize = config.logRetentionBytes, - retentionMs = config.logRetentionTimeMillis, + retentionMs = 60L * 60L * 1000L * config.logRetentionHours, maxMessageSize = config.messageMaxBytes, maxIndexSize = config.logIndexSizeMaxBytes, indexInterval = config.logIndexIntervalBytes, diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala deleted file mode 100644 index 2f75e1d..0000000 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import org.junit.Test -import junit.framework.Assert._ -import org.scalatest.junit.JUnit3Suite -import kafka.utils.TestUtils - -class KafkaConfigTest extends JUnit3Suite { - - @Test - def testLogRetentionTimeHoursProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("log.retention.hours", "1") - - val cfg = new KafkaConfig(props) - assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis) - - } - - @Test - def testLogRetentionTimeMinutesProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("log.retention.minutes", "30") - - val cfg = new KafkaConfig(props) - assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis) - - } - - @Test - def testLogRetentionTimeNoConfigProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) - - val cfg = new KafkaConfig(props) - assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis) - - } - - @Test - def testLogRetentionTimeBothMinutesAndHoursProvided() { - val props = TestUtils.createBrokerConfig(0, 8181) - props.put("log.retention.minutes", "30") - props.put("log.retention.hours", "1") - - val cfg = new KafkaConfig(props) - assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis) - - } - -} \ No newline at end of file