diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java index c4aa5ff..7f669a1 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java @@ -16,15 +16,13 @@ */ package kafka.etl; -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; -import kafka.api.OffsetRequest; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.api.Request; +import kafka.common.TopicAndPartition; +import kafka.api.FetchRequest; import kafka.javaapi.FetchResponse; +import kafka.javaapi.OffsetRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; @@ -34,6 +32,14 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.MultipleOutputs; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + @SuppressWarnings({ "deprecation"}) public class KafkaETLContext { @@ -144,7 +150,7 @@ public class KafkaETLContext { public boolean fetchMore () throws IOException { if (!hasMore()) return false; - + FetchRequest fetchRequest = new FetchRequestBuilder() .correlationId(requestId) .clientId(_request.clientId()) @@ -216,15 +222,23 @@ public class KafkaETLContext { /* get smallest and largest offsets*/ long[] range = new long[2]; - long[] startOffsets = _consumer.getOffsetsBefore(_request.getTopic(), _request.getPartition(), - OffsetRequest.EarliestTime(), 1); + TopicAndPartition topicAndPartition = new TopicAndPartition(_request.getTopic(), _request.getPartition()); + Map requestInfo = + new HashMap(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1)); + OffsetRequest request = new OffsetRequest( + requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId(), Request.NonFollowerId()); + long[] startOffsets = _consumer.getOffsetsBefore(request).offsets(_request.getTopic(), _request.getPartition()); if (startOffsets.length != 1) throw new IOException("input:" + _input + " Expect one smallest offset but get " + startOffsets.length); range[0] = startOffsets[0]; - long[] endOffsets = _consumer.getOffsetsBefore(_request.getTopic(), _request.getPartition(), - OffsetRequest.LatestTime(), 1); + requestInfo.clear(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1)); + request = new OffsetRequest( + requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId(), Request.NonFollowerId()); + long[] endOffsets = _consumer.getOffsetsBefore(request).offsets(_request.getTopic(), _request.getPartition()); if (endOffsets.length != 1) throw new IOException("input:" + _input + " Expect one latest offset but get " + endOffsets.length); diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index d924779..41ac09c 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -30,8 +30,6 @@ object FetchRequest { val CurrentVersion = 1.shortValue() val DefaultCorrelationId = -1 val DefaultClientId = "" - val DefaultReplicaId = -1 - val NonFollowerId = DefaultReplicaId val DefaultMaxWait = 0 val DefaultMinBytes = 0 @@ -60,7 +58,7 @@ object FetchRequest { case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, correlationId: Int = FetchRequest.DefaultCorrelationId, clientId: String = FetchRequest.DefaultClientId, - replicaId: Int = FetchRequest.DefaultReplicaId, + replicaId: Int = Request.DefaultReplicaId, maxWait: Int = FetchRequest.DefaultMaxWait, minBytes: Int = FetchRequest.DefaultMinBytes, requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) @@ -113,7 +111,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, }) } - def isFromFollower = replicaId != FetchRequest.NonFollowerId + def isFromFollower = replicaId != Request.NonFollowerId def numPartitions = requestInfo.size } @@ -124,7 +122,7 @@ class FetchRequestBuilder() { private var correlationId = FetchRequest.DefaultCorrelationId private val versionId = FetchRequest.CurrentVersion private var clientId = FetchRequest.DefaultClientId - private var replicaId = FetchRequest.DefaultReplicaId + private var replicaId = Request.DefaultReplicaId private var maxWait = FetchRequest.DefaultMaxWait private var minBytes = FetchRequest.DefaultMinBytes private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo] diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 51907ec..cf90a7a 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -19,6 +19,8 @@ package kafka.api import java.nio.ByteBuffer import kafka.utils.Utils +import kafka.common.TopicAndPartition + object OffsetRequest { val CurrentVersion = 1.shortValue() @@ -32,36 +34,67 @@ object OffsetRequest { def readFrom(buffer: ByteBuffer): OffsetRequest = { val versionId = buffer.getShort val clientId = Utils.readShortString(buffer) - val topic = Utils.readShortString(buffer, "UTF-8") - val partition = buffer.getInt() - val offset = buffer.getLong - val maxNumOffsets = buffer.getInt - new OffsetRequest(versionId, clientId, topic, partition, offset, maxNumOffsets) + val replicaId = buffer.getInt + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topic = Utils.readShortString(buffer) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partitionId = buffer.getInt + val time = buffer.getLong + val maxNumOffsets = buffer.getInt + (TopicAndPartition(topic, partitionId), PartitionOffsetRequestInfo(time, maxNumOffsets)) + }) + }) + OffsetRequest(Map(pairs:_*), versionId = versionId, clientId = clientId, replicaId = replicaId) } } -case class OffsetRequest(versionId: Short = OffsetRequest.CurrentVersion, - clientId: String = OffsetRequest.DefaultClientId, - topic: String, - partition: Int, - time: Long, - maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) { - def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) = - this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets) +case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int) +case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], + versionId: Short = OffsetRequest.CurrentVersion, + clientId: String = OffsetRequest.DefaultClientId, + replicaId: Int = Request.DefaultReplicaId) + extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) { + lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) Utils.writeShortString(buffer, clientId) - Utils.writeShortString(buffer, topic) - buffer.putInt(partition) - buffer.putLong(time) - buffer.putInt(maxNumOffsets) + buffer.putInt(replicaId) + + buffer.putInt(requestInfoGroupedByTopic.size) // topic count + requestInfoGroupedByTopic.foreach { + case((topic, partitionInfos)) => + Utils.writeShortString(buffer, topic) + buffer.putInt(partitionInfos.size) // partition count + partitionInfos.foreach { + case (TopicAndPartition(_, partition), partitionInfo) => + buffer.putInt(partition) + buffer.putLong(partitionInfo.time) + buffer.putInt(partitionInfo.maxNumOffsets) + } + } } - def sizeInBytes(): Int = 2 + (2 + clientId.length()) + (2 + topic.length) + 4 + 8 + 4 + def sizeInBytes = + 2 + /* versionId */ + Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + + 4 + /* replicaId */ + 4 + /* topic count */ + requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { + val (topic, partitionInfos) = currTopic + foldedTopics + + Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) + + 4 + /* partition count */ + partitionInfos.size * ( + 4 + /* partition */ + 8 + /* time */ + 4 /* maxNumOffsets */ + ) + }) - override def toString(): String= "OffsetRequest(version:" + versionId + ", client id:" + clientId + - ", topic:" + topic + ", part:" + partition + ", time:" + time + ", maxNumOffsets:" + maxNumOffsets + ")" + def isFromFollower = replicaId != Request.NonFollowerId } diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala index 61d3440..242b496 100644 --- a/core/src/main/scala/kafka/api/OffsetResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetResponse.scala @@ -18,43 +18,77 @@ package kafka.api import java.nio.ByteBuffer -import kafka.common.ErrorMapping +import kafka.common.{ErrorMapping, TopicAndPartition} +import kafka.utils.Utils object OffsetResponse { + def readFrom(buffer: ByteBuffer): OffsetResponse = { val versionId = buffer.getShort - val errorCode = buffer.getShort - val offsetsSize = buffer.getInt - val offsets = new Array[Long](offsetsSize) - for( i <- 0 until offsetsSize) { - offsets(i) = buffer.getLong - } - new OffsetResponse(versionId, offsets, errorCode) + val numTopics = buffer.getInt + val pairs = (1 to numTopics).flatMap(_ => { + val topic = Utils.readShortString(buffer) + val numPartitions = buffer.getInt + (1 to numPartitions).map(_ => { + val partition = buffer.getInt + val error = buffer.getShort + val numOffsets = buffer.getInt + val offsets = (1 to numOffsets).map(_ => buffer.getLong) + (TopicAndPartition(topic, partition), PartitionOffsetsResponse(error, offsets)) + }) + }) + OffsetResponse(versionId, Map(pairs:_*)) } + } + +case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long]) + + case class OffsetResponse(versionId: Short, - offsets: Array[Long], - errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ - val sizeInBytes = 2 + 2 + offsets.foldLeft(4)((sum, _) => sum + 8) + partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse]) + extends RequestOrResponse { - def writeTo(buffer: ByteBuffer) { - buffer.putShort(versionId) - /* error code */ - buffer.putShort(errorCode) - buffer.putInt(offsets.length) - offsets.foreach(buffer.putLong(_)) + lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic) + + def hasError = partitionErrorAndOffsets.values.exists(_.error != ErrorMapping.NoError) + + val sizeInBytes = { + 2 + /* versionId */ + 4 + /* topic count */ + offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { + val (topic, errorAndOffsetsMap) = currTopic + foldedTopics + + Utils.shortStringLength(topic) + + 4 + /* partition count */ + errorAndOffsetsMap.foldLeft(0)((foldedPartitions, currPartition) => { + foldedPartitions + + 4 + /* partition id */ + 2 + /* partition error */ + 4 + /* offset array length */ + currPartition._2.offsets.size * 8 /* offset */ + }) + }) } - // need to override case-class equals due to broken java-array equals() - override def equals(other: Any): Boolean = { - other match { - case that: OffsetResponse => - ( versionId == that.versionId && - errorCode == that.errorCode && - offsets.toSeq == that.offsets.toSeq) - case _ => false + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + buffer.putInt(offsetsGroupedByTopic.size) // topic count + offsetsGroupedByTopic.foreach { + case((topic, errorAndOffsetsMap)) => + Utils.writeShortString(buffer, topic) + buffer.putInt(errorAndOffsetsMap.size) // partition count + errorAndOffsetsMap.foreach { + case((TopicAndPartition(_, partition), errorAndOffsets)) => + buffer.putInt(partition) + buffer.putShort(errorAndOffsets.error) + buffer.putInt(errorAndOffsets.offsets.size) // offset array length + errorAndOffsets.offsets.foreach(buffer.putLong(_)) + } } } + } + diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index b9093b7..ceb36e1 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -66,12 +66,11 @@ case class ProducerResponse(versionId: Short, foldedTopics + Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) + 4 + /* partition count for this topic */ - currTopic._2.foldLeft (0) ((foldedPartitions, currPartition) => { - foldedPartitions + + currTopic._2.size * { 4 + /* partition id */ 2 + /* error code */ 8 /* offset */ - }) + } }) } diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala index 611bb42..9b1b478 100644 --- a/core/src/main/scala/kafka/api/RequestOrResponse.scala +++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -25,6 +25,12 @@ object RequestOrResponse { } +object Request { + val DefaultReplicaId = -1 + val NonFollowerId = DefaultReplicaId +} + + private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) { def sizeInBytes: Int diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 01db46e..8dae7e4 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -20,7 +20,7 @@ package kafka.consumer import kafka.cluster.Broker import kafka.server.AbstractFetcherThread import kafka.message.ByteBufferMessageSet -import kafka.api.{FetchRequest, OffsetRequest, PartitionData} +import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, PartitionData} import kafka.common.TopicAndPartition @@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String, val consumerFetcherManager: ConsumerFetcherManager) extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs, socketBufferSize = config.socketBufferSize, fetchSize = config.fetchSize, - fetcherBrokerId = FetchRequest.NonFollowerId, maxWait = config.maxFetchWaitMs, + fetcherBrokerId = Request.NonFollowerId, maxWait = config.maxFetchWaitMs, minBytes = config.minFetchBytes) { // process fetched data @@ -50,12 +50,13 @@ class ConsumerFetcherThread(name: String, case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime case _ => startTimestamp = OffsetRequest.LatestTime } - val newOffset = simpleConsumer.getOffsetsBefore(topic, partitionId, startTimestamp, 1)(0) - + val topicAndPartition = TopicAndPartition(topic, partitionId) + val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1))) + val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionId)) pti.resetFetchOffset(newOffset) pti.resetConsumeOffset(newOffset) - return newOffset + newOffset } // any logic for partitions whose leader has changed diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 00b4727..5c1af6f 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -20,10 +20,10 @@ package kafka.consumer import kafka.api._ import kafka.network._ import kafka.utils._ -import kafka.common.ErrorMapping import java.util.concurrent.TimeUnit import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} + /** * A consumer of kafka messages */ @@ -110,18 +110,10 @@ class SimpleConsumer( val host: String, /** * Get a list of valid offsets (up to maxSize) before the given time. - * The result is a list of offsets, in descending order. - * - * @param time: time in millisecs (-1, from the latest offset available, -2 from the smallest offset available) - * @return an array of offsets + * @param request a [[kafka.api.OffsetRequest]] object. + * @return a [[kafka.api.OffsetResponse]] object. */ - def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = { - val request = new OffsetRequest(topic, partition, time, maxNumOffsets) - val offsetResponse = OffsetResponse.readFrom(sendRequest(request).buffer) - // try to throw exception based on global error codes - ErrorMapping.maybeThrowException(offsetResponse.errorCode) - offsetResponse.offsets - } + def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) private def getOrMakeConnection() { if(!blockingChannel.isConnected) { diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 3283f08..f62e352 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -27,13 +27,14 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException import java.net.InetAddress import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import org.apache.zookeeper.Watcher.Event.KeeperState -import kafka.api.OffsetRequest +import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} import java.util.UUID import kafka.serializer.Decoder import kafka.utils.ZkUtils._ -import kafka.common.{KafkaException, NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException} +import kafka.common._ import com.yammer.metrics.core.Gauge import kafka.metrics.KafkaMetricsGroup +import scala.Some /** @@ -262,8 +263,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout, ConsumerConfig.SocketBufferSize) - val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1) - producedOffset = offsets(0) + val topicAndPartition = TopicAndPartition(topic, partitionId) + val request = OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))) + producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head } catch { case e => error("error in earliestOrLatestOffset() ", e) diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala index b4e3a06..307ca68 100644 --- a/core/src/main/scala/kafka/javaapi/Implicits.scala +++ b/core/src/main/scala/kafka/javaapi/Implicits.scala @@ -30,6 +30,9 @@ private[javaapi] object Implicits extends Logging { implicit def toJavaTopicMetadataResponse(response: kafka.api.TopicMetadataResponse): kafka.javaapi.TopicMetadataResponse = new kafka.javaapi.TopicMetadataResponse(response) + implicit def toJavaOffsetResponse(response: kafka.api.OffsetResponse): kafka.javaapi.OffsetResponse = + new kafka.javaapi.OffsetResponse(response) + implicit def optionToJavaRef[T](opt: Option[T]): T = { opt match { case Some(obj) => obj diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala new file mode 100644 index 0000000..4ab0464 --- /dev/null +++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi + +import kafka.common.TopicAndPartition +import kafka.api.PartitionOffsetRequestInfo +import collection.JavaConversions +import java.nio.ByteBuffer + + +class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffsetRequestInfo], + versionId: Short, + clientId: String, + replicaId: Int) { + + val underlying = { + val scalaMap = JavaConversions.asMap(requestInfo).toMap + kafka.api.OffsetRequest( + requestInfo = scalaMap, + versionId = versionId, + clientId = clientId, + replicaId = replicaId + ) + } + + + def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } + + + def sizeInBytes = underlying.sizeInBytes + + + override def toString = underlying.toString + + + override def equals(other: Any) = canEqual(other) && { + val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetRequest] + this.underlying.equals(otherOffsetRequest.underlying) + } + + + def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetRequest] + + + override def hashCode = underlying.hashCode + +} diff --git a/core/src/main/scala/kafka/javaapi/OffsetResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetResponse.scala new file mode 100644 index 0000000..8b1847e --- /dev/null +++ b/core/src/main/scala/kafka/javaapi/OffsetResponse.scala @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi + +import kafka.common.TopicAndPartition + +class OffsetResponse(private val underlying: kafka.api.OffsetResponse) { + + def hasError = underlying.hasError + + + def errorCode(topic: String, partition: Int) = + underlying.partitionErrorAndOffsets(TopicAndPartition(topic, partition)).error + + + def offsets(topic: String, partition: Int) = + underlying.partitionErrorAndOffsets(TopicAndPartition(topic, partition)).offsets.toArray + + + override def equals(other: Any) = canEqual(other) && { + val otherOffsetResponse = other.asInstanceOf[kafka.javaapi.OffsetResponse] + this.underlying.equals(otherOffsetResponse.underlying) + } + + + def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetResponse] + + + override def hashCode = underlying.hashCode + + + override def toString = underlying.toString + +} diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala index 2b446c8..803ec4b 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala @@ -19,6 +19,8 @@ package kafka.javaapi.consumer import kafka.utils.threadsafe import kafka.javaapi.FetchResponse +import kafka.javaapi.OffsetRequest + /** * A consumer of kafka messages @@ -67,13 +69,14 @@ class SimpleConsumer(val host: String, /** * Get a list of valid offsets (up to maxSize) before the given time. - * The result is a list of offsets, in descending order. * - * @param time: time in millisecs (-1, from the latest offset available, -2 from the smallest offset available) - * @return an array of offsets + * @param request a [[kafka.javaapi.OffsetRequest]] object. + * @return a [[kafka.javaapi.OffsetResponse]] object. */ - def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = - underlying.getOffsetsBefore(topic, partition, time, maxNumOffsets) + def getOffsetsBefore(request: OffsetRequest): kafka.javaapi.OffsetResponse = { + import kafka.javaapi.Implicits._ + underlying.getOffsetsBefore(request.underlying) + } def close() { underlying.close diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 2f86626..1c01df8 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -81,12 +81,10 @@ object Log { nf.format(offset) + FileSuffix } - def getEmptyOffsets(request: OffsetRequest): Array[Long] = { - if (request.time == OffsetRequest.LatestTime || request.time == OffsetRequest.EarliestTime) - return Array(0L) - else - return Array() - } + def getEmptyOffsets(timestamp: Long): Seq[Long] = + if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) + Seq(0L) + else Nil } @@ -389,7 +387,7 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag } } - def getOffsetsBefore(request: OffsetRequest): Array[Long] = { + def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val segsArray = segments.view var offsetTimeArray: Array[(Long, Long)] = null if (segsArray.last.size > 0) @@ -403,7 +401,7 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.sizeInBytes(), time.milliseconds) var startIndex = -1 - request.time match { + timestamp match { case OffsetRequest.LatestTime => startIndex = offsetTimeArray.length - 1 case OffsetRequest.EarliestTime => @@ -413,20 +411,21 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2))) startIndex = offsetTimeArray.length - 1 while (startIndex >= 0 && !isFound) { - if (offsetTimeArray(startIndex)._2 <= request.time) + if (offsetTimeArray(startIndex)._2 <= timestamp) isFound = true else startIndex -=1 } } - val retSize = request.maxNumOffsets.min(startIndex + 1) + val retSize = maxNumOffsets.min(startIndex + 1) val ret = new Array[Long](retSize) for (j <- 0 until retSize) { ret(j) = offsetTimeArray(startIndex)._1 startIndex -= 1 } - ret + // ensure that the returned seq is in descending order of offsets + ret.toSeq.sortBy(- _) } /** diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 45680e4..29f402d 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -23,7 +23,7 @@ import scala.collection._ import kafka.server.KafkaConfig import kafka.api.OffsetRequest import kafka.log.Log._ -import kafka.common.KafkaException +import kafka.common.{TopicAndPartition, KafkaException} /** * The guy who creates and hands out logs @@ -104,11 +104,11 @@ private[kafka] class LogManager(val config: KafkaConfig, } } - def getOffsets(offsetRequest: OffsetRequest): Array[Long] = { - val log = getLog(offsetRequest.topic, offsetRequest.partition) + def getOffsets(topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { + val log = getLog(topicAndPartition.topic, topicAndPartition.partition) log match { - case Some(l) => l.getOffsetsBefore(offsetRequest) - case None => getEmptyOffsets(offsetRequest) + case Some(l) => l.getOffsetsBefore(timestamp, maxNumOffsets) + case None => getEmptyOffsets(timestamp) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5667fbc..fd56de6 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -350,21 +350,36 @@ class KafkaApis(val requestChannel: RequestChannel, requestLogger.trace("Handling offset request " + offsetRequest.toString) trace("Handling offset request " + offsetRequest.toString) - var response: OffsetResponse = null - try { - // ensure leader exists - replicaManager.getLeaderReplicaIfLocal(offsetRequest.topic, offsetRequest.partition) - val offsets = replicaManager.logManager.getOffsets(offsetRequest) - response = new OffsetResponse(offsetRequest.versionId, offsets) - } catch { - case ioe: IOException => - fatal("Halting due to unrecoverable I/O error while handling producer request: " + ioe.getMessage, ioe) - System.exit(1) - case e => - warn("Error while responding to offset request", e) - response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long], - ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort) - } + val responseMap = offsetRequest.requestInfo.map(elem => { + val (topicAndPartition, partitionOffsetRequestInfo) = elem + try { + // ensure leader exists + val leader = replicaManager.getLeaderReplicaIfLocal( + topicAndPartition.topic, topicAndPartition.partition) + val offsets = { + val allOffsets = replicaManager.logManager.getOffsets(topicAndPartition, + partitionOffsetRequestInfo.time, + partitionOffsetRequestInfo.maxNumOffsets) + if (offsetRequest.isFromFollower) allOffsets + else { + val hw = leader.highWatermark + if (allOffsets.exists(_ > hw)) + immutable.Seq(hw) ++ allOffsets.dropWhile(_ > hw) + else allOffsets + } + } + (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets)) + } catch { + case ioe: IOException => + fatal("Halting due to unrecoverable I/O error while handling offset request: " + ioe.getMessage, ioe) + // compiler requires scala.sys.exit (not System.exit). + exit(1) + case e => + warn("Error while responding to offset request", e) + (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) ) + } + }) + val response = OffsetResponse(OffsetRequest.CurrentVersion, responseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 2fb0630..64cc2e8 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.api.{OffsetRequest, PartitionData} +import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, PartitionData} import kafka.cluster.Broker import kafka.message.ByteBufferMessageSet import kafka.common.TopicAndPartition @@ -51,10 +51,15 @@ class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: Kafk // handle a partition whose offset is out of range and return a new fetch offset def handleOffsetOutOfRange(topic: String, partitionId: Int): Long = { // This means the local replica is out of date. Truncate the log and catch up from beginning. - val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, OffsetRequest.EarliestTime, 1) + val topicAndPartition = TopicAndPartition(topic, partitionId) + val request = OffsetRequest( + replicaId = brokerConfig.brokerId, + requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)) + ) + val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val replica = replicaMgr.getReplica(topic, partitionId).get - replica.log.get.truncateAndStartWithNewOffset(offsets(0)) - return offsets(0) + replica.log.get.truncateAndStartWithNewOffset(offset) + offset } // any logic for partitions whose leader has changed diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index ee3962f..76ecebe 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -23,9 +23,14 @@ import org.I0Itec.zkclient.ZkClient import kafka.utils.{ZkUtils, ZKStringSerializer, Logging} import kafka.consumer.SimpleConsumer import collection.mutable.Map +import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} +import kafka.common.TopicAndPartition +import scala.collection._ + + object ConsumerOffsetChecker extends Logging { - private val consumerMap: Map[String, Option[SimpleConsumer]] = Map() + private val consumerMap: mutable.Map[String, Option[SimpleConsumer]] = mutable.Map() private val BidPidPattern = """(\d+)-(\d+)""".r @@ -61,8 +66,10 @@ object ConsumerOffsetChecker extends Logging { bid, getConsumer(zkClient, bid)) consumerOpt match { case Some(consumer) => - val logSize = - consumer.getOffsetsBefore(topic, pid.toInt, -1, 1).last.toLong + val topicAndPartition = TopicAndPartition(topic, pid.toInt) + val request = + OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head println("%20s%d".format("Log size = ", logSize)) println("%20s%,d (%,.2fG)".format("= ", logSize, logSize / math.pow(1024, 3))) diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 034b734..e78d53d 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -21,6 +21,9 @@ package kafka.tools import kafka.consumer._ import joptsimple._ import java.net.URI +import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} +import kafka.common.TopicAndPartition + object GetOffsetShell { @@ -65,7 +68,9 @@ object GetOffsetShell { var time = options.valueOf(timeOpt).longValue val nOffsets = options.valueOf(nOffsetsOpt).intValue val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000) - val offsets = consumer.getOffsetsBefore(topic, partition, time, nOffsets) + val topicAndPartition = TopicAndPartition(topic, partition) + val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets))) + val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets println("get " + offsets.length + " results") for (offset <- offsets) println(offset) diff --git a/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala index 13bc4fa..a4df97b 100644 --- a/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala +++ b/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala @@ -19,8 +19,8 @@ package kafka.utils import org.I0Itec.zkclient.ZkClient import kafka.consumer.{SimpleConsumer, ConsumerConfig} -import kafka.api.OffsetRequest -import kafka.common.KafkaException +import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} +import kafka.common.{TopicAndPartition, KafkaException} /** * A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK. @@ -43,7 +43,6 @@ object UpdateOffsetsInZK { } private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = { - val cluster = ZkUtils.getCluster(zkClient) val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic)) var partitions: Seq[Int] = Nil @@ -65,11 +64,13 @@ object UpdateOffsetsInZK { ZkUtils.getBrokerInfo(zkClient, broker) match { case Some(brokerInfo) => val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024) - val offsets = consumer.getOffsetsBefore(topic, partition, offsetOption, 1) + val topicAndPartition = TopicAndPartition(topic, partition) + val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1))) + val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val topicDirs = new ZKGroupTopicDirs(config.groupId, topic) - println("updating partition " + partition + " with new offset: " + offsets(0)) - ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offsets(0).toString) + println("updating partition " + partition + " with new offset: " + offset) + ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString) numParts += 1 case None => throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker)) } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index e999141..c6a8239 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -31,6 +31,7 @@ import joptsimple.{OptionSpec, OptionSet, OptionParser} import kafka.common.KafkaException import kafka.cluster.Broker import util.parsing.json.JSON +import kafka.api.RequestOrResponse /** @@ -153,7 +154,7 @@ object Utils extends Logging { * @param buffer The buffer to read from * @param encoding The encoding in which to read the string */ - def readShortString(buffer: ByteBuffer, encoding: String = "UTF-8"): String = { + def readShortString(buffer: ByteBuffer, encoding: String = RequestOrResponse.DefaultCharset): String = { val size: Int = buffer.getShort() if(size < 0) return null @@ -168,7 +169,7 @@ object Utils extends Logging { * @param string The string to write * @param encoding The encoding in which to write the string */ - def writeShortString(buffer: ByteBuffer, string: String, encoding: String = "UTF-8"): Unit = { + def writeShortString(buffer: ByteBuffer, string: String, encoding: String = RequestOrResponse.DefaultCharset) { if(string == null) { buffer.putShort(-1) } else if(string.length > Short.MaxValue) { @@ -184,7 +185,7 @@ object Utils extends Logging { * @param string The string to write * @param encoding The encoding in which to write the string */ - def shortStringLength(string: String, encoding: String = "UTF-8"): Int = { + def shortStringLength(string: String, encoding: String = RequestOrResponse.DefaultCharset): Int = { if(string == null) { 2 } else { diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala index e23fc8d..fd248ee 100644 --- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala @@ -22,16 +22,15 @@ import kafka.utils._ import kafka.server.{KafkaConfig, KafkaServer} import junit.framework.Assert._ import java.util.{Random, Properties} -import collection.mutable.WrappedArray import kafka.consumer.SimpleConsumer import org.junit.{After, Before, Test} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite import kafka.admin.CreateTopicCommand -import kafka.api.{FetchRequestBuilder, OffsetRequest} +import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} import kafka.utils.TestUtils._ -import kafka.common.UnknownTopicOrPartitionException +import kafka.common.{ErrorMapping, TopicAndPartition, UnknownTopicOrPartitionException} object LogOffsetTest { val random = new Random() @@ -67,12 +66,12 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testGetOffsetsForUnknownTopic() { - try { - simpleConsumer.getOffsetsBefore("foo", 0, OffsetRequest.LatestTime, 10) - fail("Should fail with UnknownTopicException since topic foo was never created") - }catch { - case e: UnknownTopicOrPartitionException => // this is ok - } + val topicAndPartition = TopicAndPartition("foo", 0) + val request = OffsetRequest( + Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10))) + val offsetResponse = simpleConsumer.getOffsetsBefore(request) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, + offsetResponse.partitionErrorAndOffsets(topicAndPartition).error) } @Test @@ -92,16 +91,17 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) log.flush() - - val offsetRequest = new OffsetRequest(topic, part, OffsetRequest.LatestTime, 10) - - val offsets = log.getOffsetsBefore(offsetRequest) - assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long])) + val offsets = log.getOffsetsBefore(OffsetRequest.LatestTime, 10) + assertEquals(Seq(240L, 216L, 108L, 0L), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) - val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, - OffsetRequest.LatestTime, 10) - assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long])) + val topicAndPartition = TopicAndPartition(topic, part) + val offsetRequest = OffsetRequest( + Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)), + replicaId = 0) + val consumerOffsets = + simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets + assertEquals(Seq(240L, 216L, 108L, 0L), consumerOffsets) // try to fetch using latest offset val fetchResponse = simpleConsumer.fetch( @@ -124,8 +124,11 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { var offsetChanged = false for(i <- 1 to 14) { - val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, 0, - OffsetRequest.EarliestTime, 1) + val topicAndPartition = TopicAndPartition(topic, 0) + val offsetRequest = + OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))) + val consumerOffsets = + simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets if(consumerOffsets(0) == 1) { offsetChanged = true @@ -153,14 +156,15 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { time.sleep(20) val now = time.milliseconds - val offsetRequest = new OffsetRequest(topic, part, now, 10) - val offsets = log.getOffsetsBefore(offsetRequest) - println("Offsets = " + offsets.mkString(",")) - assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long])) + val offsets = log.getOffsetsBefore(now, 10) + assertEquals(Seq(240L, 216L, 108L, 0L), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) - val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10) - assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long])) + val topicAndPartition = TopicAndPartition(topic, part) + val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0) + val consumerOffsets = + simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets + assertEquals(Seq(240L, 216L, 108L, 0L), consumerOffsets) } @Test @@ -179,16 +183,17 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { log.append(new ByteBufferMessageSet(NoCompressionCodec, message)) log.flush() - val offsetRequest = new OffsetRequest(topic, part, - OffsetRequest.EarliestTime, 10) - val offsets = log.getOffsetsBefore(offsetRequest) + val offsets = log.getOffsetsBefore(OffsetRequest.EarliestTime, 10) - assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) ) + assertEquals(Seq(0L), offsets) waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000) - val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, - OffsetRequest.EarliestTime, 10) - assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) ) + val topicAndPartition = TopicAndPartition(topic, part) + val offsetRequest = + OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10))) + val consumerOffsets = + simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets + assertEquals(Seq(0L), consumerOffsets) } private def createBrokerConfig(nodeId: Int, port: Int): Properties = { diff --git a/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala b/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala index 02b4d63..c463763 100644 --- a/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala @@ -109,12 +109,15 @@ object RpcDataSerializationTestUtils{ FetchResponse(1, 1, topicData) } - def createTestOffsetRequest: OffsetRequest = { - new OffsetRequest(topic1, 1, 1000, 200) - } + def createTestOffsetRequest = new OffsetRequest( + collection.immutable.Map(TopicAndPartition(topic1, 1) -> PartitionOffsetRequestInfo(1000, 200)), + replicaId = 0 + ) def createTestOffsetResponse: OffsetResponse = { - new OffsetResponse(1, Array(1000l, 2000l, 3000l, 4000l)) + new OffsetResponse(OffsetRequest.CurrentVersion, collection.immutable.Map( + TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(ErrorMapping.NoError, Seq(1000l, 2000l, 3000l, 4000l))) + ) } def createTestTopicMetadataRequest: TopicMetadataRequest = { @@ -191,7 +194,7 @@ class RpcDataSerializationTest extends JUnitSuite { assertEquals("The original and deserialzed fetchRequest should be the same", fetchRequest, deserializedFetchRequest) - buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes()) + buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes) offsetRequest.writeTo(buffer) buffer.rewind() val deserializedOffsetRequest = OffsetRequest.readFrom(buffer) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index a2789c0..66ecd60 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -19,12 +19,16 @@ package kafka.server import kafka.cluster.{Partition, Replica} import kafka.log.Log import kafka.message.{ByteBufferMessageSet, Message} -import kafka.network.RequestChannel +import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.utils.{Time, TestUtils, MockTime} import org.easymock.EasyMock import org.I0Itec.zkclient.ZkClient import org.scalatest.junit.JUnit3Suite -import kafka.api.{FetchRequest, FetchRequestBuilder} +import kafka.api._ +import scala.Some +import org.junit.Assert._ +import kafka.common.TopicAndPartition + class SimpleFetchTest extends JUnit3Suite { @@ -88,7 +92,7 @@ class SimpleFetchTest extends JUnit3Suite { // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log val goodFetch = new FetchRequestBuilder() - .replicaId(FetchRequest.NonFollowerId) + .replicaId(Request.NonFollowerId) .addFetch(topic, partitionId, 0, hw*2) .build() val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch) @@ -98,6 +102,33 @@ class SimpleFetchTest extends JUnit3Suite { // make sure the log only reads bytes between 0->HW (5) EasyMock.verify(log) + + // Test offset request from non-replica + val topicAndPartition = TopicAndPartition(topic, partition.partitionId) + val offsetRequest = OffsetRequest( + Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest) + + EasyMock.reset(logManager) + EasyMock.reset(replicaManager) + + EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get) + EasyMock.expect(replicaManager.logManager).andReturn(logManager) + EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo)) + + EasyMock.replay(replicaManager) + EasyMock.replay(logManager) + + apis.handleOffsetRequest(new RequestChannel.Request(processor = 0, + requestKey = 5, + buffer = offsetRequestBB, + startTimeNs = 1)) + val offsetResponseBuffer = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer + val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer) + EasyMock.verify(replicaManager) + EasyMock.verify(logManager) + assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size) + assertEquals(hw.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head) } /** @@ -173,6 +204,34 @@ class SimpleFetchTest extends JUnit3Suite { * an offset of 15 */ EasyMock.verify(log) + + // Test offset request from replica + val topicAndPartition = TopicAndPartition(topic, partition.partitionId) + val offsetRequest = OffsetRequest( + Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)), + replicaId = followerReplicaId) + val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest) + + EasyMock.reset(logManager) + EasyMock.reset(replicaManager) + + EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get) + EasyMock.expect(replicaManager.logManager).andReturn(logManager) + EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo)) + + EasyMock.replay(replicaManager) + EasyMock.replay(logManager) + + apis.handleOffsetRequest(new RequestChannel.Request(processor = 1, + requestKey = 5, + buffer = offsetRequestBB, + startTimeNs = 1)) + val offsetResponseBuffer = requestChannel.receiveResponse(1).responseSend.asInstanceOf[BoundedByteBufferSend].buffer + val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer) + EasyMock.verify(replicaManager) + EasyMock.verify(logManager) + assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size) + assertEquals(leo.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head) } private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int, diff --git a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala index f1c19dd..40d1ca8 100644 --- a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala @@ -19,11 +19,12 @@ package kafka.perf import java.net.URI import java.text.SimpleDateFormat -import kafka.api.{FetchRequestBuilder, OffsetRequest} +import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} import kafka.consumer.SimpleConsumer import kafka.utils._ import org.apache.log4j.Logger -import kafka.message.ByteBufferMessageSet +import kafka.common.TopicAndPartition + /** * Performance test for the simple consumer @@ -44,8 +45,11 @@ object SimpleConsumerPerformance { val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize) // reset to latest or smallest offset - var offset: Long = if(config.fromLatest) consumer.getOffsetsBefore(config.topic, config.partition, OffsetRequest.LatestTime, 1).head - else consumer.getOffsetsBefore(config.topic, config.partition, OffsetRequest.EarliestTime, 1).head + val topicAndPartition = TopicAndPartition(config.topic, config.partition) + val request = OffsetRequest(Map( + topicAndPartition -> PartitionOffsetRequestInfo(if (config.fromLatest) OffsetRequest.LatestTime else OffsetRequest.EarliestTime, 1) + )) + var offset: Long = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head val startMs = System.currentTimeMillis var done = false