diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java index 1c05a77..648b4b2 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java @@ -16,27 +16,26 @@ */ package kafka.etl; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.zip.CRC32; import kafka.api.FetchRequest; -import kafka.javaapi.MultiFetchResponse; +import kafka.api.FetchRequestBuilder; import kafka.api.OffsetRequest; import kafka.common.ErrorMapping; +import kafka.javaapi.FetchResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.Message; +import kafka.javaapi.message.MessageSet; import kafka.message.MessageAndOffset; -import kafka.message.MessageSet; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.MultipleOutputs; + +import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; @SuppressWarnings({ "deprecation"}) public class KafkaETLContext { @@ -59,7 +58,8 @@ public class KafkaETLContext { protected long _offset = Long.MAX_VALUE; /*current offset*/ protected long _count; /*current count*/ - protected MultiFetchResponse _response = null; /*fetch response*/ + protected int requestId = 0; /* the id of the next fetch request */ + protected FetchResponse _response = null; /*fetch response*/ protected Iterator _messageIt = null; /*message iterator*/ protected Iterator _respIterator = null; protected int _retry = 0; @@ -149,15 +149,19 @@ public class KafkaETLContext { public boolean fetchMore () throws IOException { if (!hasMore()) return false; - FetchRequest fetchRequest = - new FetchRequest(_request.getTopic(), _request.getPartition(), _offset, _bufferSize); - List array = new ArrayList(); - array.add(fetchRequest); + FetchRequest fetchRequest = new FetchRequestBuilder() + .correlationId(requestId) + .clientId(_request.clientId()) + .addFetch(_request.getTopic(), _request.getPartition(), _offset, _bufferSize) + .build(); long tempTime = System.currentTimeMillis(); - _response = _consumer.multifetch(array); - if(_response != null) - _respIterator = _response.iterator(); + _response = _consumer.fetch(fetchRequest); + if(_response != null) { + _respIterator = new ArrayList(){{ + add((ByteBufferMessageSet) _response.messageSet(_request.getTopic(), _request.getPartition())); + }}.iterator(); + } _requestTime += (System.currentTimeMillis() - tempTime); return true; diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java index defb51b..87df0ea 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLRequest.java @@ -29,6 +29,7 @@ public class KafkaETLRequest { URI _uri; int _partition; long _offset = DEFAULT_OFFSET; + String _clientId = "KafkaHadoopETL"; public KafkaETLRequest() { @@ -83,11 +84,11 @@ public class KafkaETLRequest { _offset = offset; } - public String getTopic() { return _topic;} - public URI getURI () { return _uri;} - public int getPartition() { return _partition;} - - public long getOffset() { return _offset;} + public String getTopic() { return _topic; } + public URI getURI () { return _uri; } + public int getPartition() { return _partition; } + public long getOffset() { return _offset; } + public String clientId() { return _clientId; } public boolean isValidOffset() { return _offset >= 0; diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 589dfda..b97d468 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -20,32 +20,149 @@ package kafka.api import java.nio._ import kafka.network._ import kafka.utils._ +import scala.collection.mutable.{HashMap, Buffer, ListBuffer} + +object OffsetDetail { + + def readFrom(buffer: ByteBuffer): OffsetDetail = { + val topic = Utils.readShortString(buffer, "UTF-8") + + val partitionsCount = buffer.getInt + val partitions = new Array[Int](partitionsCount) + for (i <- 0 until partitions.length) + partitions(i) = buffer.getInt + + val offsetsCount = buffer.getInt + val offsets = new Array[Long](offsetsCount) + for (i <- 0 until offsets.length) + offsets(i) = buffer.getLong + + val fetchesCount = buffer.getInt + val fetchSizes = new Array[Int](fetchesCount) + for (i <- 0 until fetchSizes.length) + fetchSizes(i) = buffer.getInt + + new OffsetDetail(topic, partitions, offsets, fetchSizes) + } + +} + +case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long], fetchSizes: Seq[Int]) { + + def writeTo(buffer: ByteBuffer) { + Utils.writeShortString(buffer, topic, "UTF-8") + + if(partitions.size > Int.MaxValue || offsets.size > Int.MaxValue || fetchSizes.size > Int.MaxValue) + throw new IllegalArgumentException("Number of fetches in FetchRequest exceeds " + Int.MaxValue + ".") + + buffer.putInt(partitions.length) + partitions.foreach(buffer.putInt(_)) + + buffer.putInt(offsets.length) + offsets.foreach(buffer.putLong(_)) + + buffer.putInt(fetchSizes.length) + fetchSizes.foreach(buffer.putInt(_)) + } + + def sizeInBytes(): Int = { + 2 + topic.length() + // topic string + partitions.foldLeft(4)((s, _) => s + 4) + // each request partition (int) + offsets.foldLeft(4)((s, _) => s + 8) + // each request offset (long) + fetchSizes.foldLeft(4)((s,_) => s + 4) // each request fetch size + } +} object FetchRequest { - + val CurrentVersion = 1.shortValue() + def readFrom(buffer: ByteBuffer): FetchRequest = { - val topic = Utils.readShortString(buffer, "UTF-8") - val partition = buffer.getInt() - val offset = buffer.getLong() - val size = buffer.getInt() - new FetchRequest(topic, partition, offset, size) + val correlationId = buffer.getInt + val versionId = buffer.getShort + val clientId = Utils.readShortString(buffer, "UTF-8") + val replicaId = buffer.getInt + val maxWait = buffer.getInt + val minBytes = buffer.getInt + val offsetsCount = buffer.getInt + val offsetInfo = new Array[OffsetDetail](offsetsCount) + for(i <- 0 until offsetInfo.length) + offsetInfo(i) = OffsetDetail.readFrom(buffer) + + new FetchRequest(correlationId, versionId, clientId, replicaId, maxWait, minBytes, offsetInfo) } + } -class FetchRequest(val topic: String, - val partition: Int, - val offset: Long, - val maxSize: Int) extends Request(RequestKeys.Fetch) { - +case class FetchRequest( correlationId: Int, + versionId: Short, + clientId: String, + replicaId: Int, + maxWait: Int, + minBytes: Int, + offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) { + def writeTo(buffer: ByteBuffer) { - Utils.writeShortString(buffer, topic) - buffer.putInt(partition) - buffer.putLong(offset) - buffer.putInt(maxSize) + buffer.putInt(correlationId) + buffer.putShort(versionId) + Utils.writeShortString(buffer, clientId, "UTF-8") + buffer.putInt(replicaId) + buffer.putInt(maxWait) + buffer.putInt(minBytes) + buffer.putInt(offsetInfo.size) + for(topicDetail <- offsetInfo) { + topicDetail.writeTo(buffer) + } } - - def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4 - override def toString(): String= "FetchRequest(topic:" + topic + ", part:" + partition +" offset:" + offset + - " maxSize:" + maxSize + ")" + def sizeInBytes: Int = 4 + 2 + 2 + clientId.length() + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes()) +} + +class FetchRequestBuilder() { + private var correlationId = -1 + private val versionId = FetchRequest.CurrentVersion + private var clientId = "" + private var replicaId = -1 // sensible default + private var maxWait = -1 // sensible default + private var minBytes = -1 // sensible default + private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]] + + def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = { + val topicData = requestMap.getOrElseUpdate(topic, (ListBuffer[Int](), ListBuffer[Long](), ListBuffer[Int]())) + topicData._1.append(partition) + topicData._2.append(offset) + topicData._3.append(fetchSize) + this + } + + def correlationId(correlationId: Int): FetchRequestBuilder = { + this.correlationId = correlationId + this + } + + def clientId(clientId: String): FetchRequestBuilder = { + this.clientId = clientId + this + } + + def replicaId(replicaId: Int): FetchRequestBuilder = { + this.replicaId = replicaId + this + } + + def maxWait(maxWait: Int): FetchRequestBuilder = { + this.maxWait = maxWait + this + } + + def minBytes(minBytes: Int): FetchRequestBuilder = { + this.minBytes = minBytes + this + } + + def build() = { + val offsetDetails = requestMap.map{ topicData => + new OffsetDetail(topicData._1, topicData._2._1.toArray, topicData._2._2.toArray, topicData._2._3.toArray) + } + new FetchRequest(correlationId, versionId, clientId, replicaId, maxWait, minBytes, offsetDetails.toArray[OffsetDetail]) + } } diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index 3f5825d..b0e1036 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -20,8 +20,7 @@ package kafka.api object RequestKeys { val Produce: Short = 0 val Fetch: Short = 1 - val MultiFetch: Short = 2 - val MultiProduce: Short = 3 - val Offsets: Short = 4 - val TopicMetadata: Short = 5 + val MultiProduce: Short = 2 + val Offsets: Short = 3 + val TopicMetadata: Short = 4 } diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index c4b3e71..7c4f737 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -50,8 +50,7 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) { /** consumer id: generated automatically if not set. * Set this explicitly for only testing purpose. */ - val consumerId: Option[String] = /** TODO: can be written better in scala 2.8 */ - if (Utils.getString(props, "consumerid", null) != null) Some(Utils.getString(props, "consumerid")) else None + val consumerId: Option[String] = Option(Utils.getString(props, "consumerid", null)) /** the socket timeout for network requests */ val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout) diff --git a/core/src/main/scala/kafka/consumer/FetcherRunnable.scala b/core/src/main/scala/kafka/consumer/FetcherRunnable.scala index f573f7f..951a7e5 100644 --- a/core/src/main/scala/kafka/consumer/FetcherRunnable.scala +++ b/core/src/main/scala/kafka/consumer/FetcherRunnable.scala @@ -17,13 +17,14 @@ package kafka.consumer +import java.io.IOException import java.util.concurrent.CountDownLatch -import kafka.common.ErrorMapping +import kafka.api.{FetchRequestBuilder, OffsetRequest} import kafka.cluster.{Partition, Broker} -import kafka.api.{OffsetRequest, FetchRequest} -import org.I0Itec.zkclient.ZkClient +import kafka.common.ErrorMapping +import kafka.message.ByteBufferMessageSet import kafka.utils._ -import java.io.IOException +import org.I0Itec.zkclient.ZkClient class FetcherRunnable(val name: String, val zkClient : ZkClient, @@ -50,18 +51,26 @@ class FetcherRunnable(val name: String, info(name + " start fetching topic: " + infopti.topic + " part: " + infopti.partition.partId + " offset: " + infopti.getFetchOffset + " from " + broker.host + ":" + broker.port) + var reqId = 0 try { while (!stopped) { - val fetches = partitionTopicInfos.map(info => - new FetchRequest(info.topic, info.partition.partId, info.getFetchOffset, config.fetchSize)) - - trace("fetch request: " + fetches.toString) - - val response = simpleConsumer.multifetch(fetches : _*) + // TODO: fix up the max wait and min bytes + val builder = new FetchRequestBuilder(). + correlationId(reqId). + clientId(config.consumerId.getOrElse(name)). + maxWait(0). + minBytes(0) + partitionTopicInfos.foreach(pti => + builder.addFetch(pti.topic, pti.partition.partId, pti.getFetchOffset(), config.fetchSize) + ) + + val fetchRequest = builder.build() + trace("fetch request: " + fetchRequest) + val response = simpleConsumer.fetch(fetchRequest) var read = 0L - - for((messages, infopti) <- response.zip(partitionTopicInfos)) { + for(infopti <- partitionTopicInfos) { + val messages = response.messageSet(infopti.topic, infopti.partition.partId).asInstanceOf[ByteBufferMessageSet] try { var done = false if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) { @@ -76,8 +85,7 @@ class FetcherRunnable(val name: String, } if (!done) read += infopti.enqueue(messages, infopti.getFetchOffset) - } - catch { + } catch { case e1: IOException => // something is wrong with the socket, re-throw the exception to stop the fetcher throw e1 @@ -91,6 +99,7 @@ class FetcherRunnable(val name: String, throw e2 } } + reqId = if(reqId == Int.MaxValue) 0 else reqId + 1 trace("fetched bytes: " + read) if(read == 0) { @@ -98,8 +107,7 @@ class FetcherRunnable(val name: String, Thread.sleep(config.fetcherBackoffMs) } } - } - catch { + } catch { case e => if (stopped) info("FecherRunnable " + this + " interrupted") diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 7adc1a3..29d4816 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -20,7 +20,6 @@ package kafka.consumer import java.net._ import java.nio.channels._ import kafka.api._ -import kafka.message._ import kafka.network._ import kafka.utils._ @@ -72,7 +71,7 @@ class SimpleConsumer(val host: String, * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. * @return a set of fetched messages */ - def fetch(request: FetchRequest): ByteBufferMessageSet = { + def fetch(request: FetchRequest): FetchResponse = { lock synchronized { val startTime = SystemTime.nanoseconds getOrMakeConnection() @@ -88,51 +87,19 @@ class SimpleConsumer(val host: String, channel = connect sendRequest(request) response = getResponse - }catch { + } catch { case ioe: java.io.IOException => channel = null; throw ioe; } case e => throw e } - val endTime = SystemTime.nanoseconds - SimpleConsumerStats.recordFetchRequest(endTime - startTime) - SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit) - new ByteBufferMessageSet(response._1.buffer, request.offset, response._2) - } - } + val fetchResponse = FetchResponse.readFrom(response._1.buffer) + val fetchedSize = fetchResponse.sizeInBytes - /** - * Combine multiple fetch requests in one call. - * - * @param fetches a sequence of fetch requests. - * @return a sequence of fetch responses - */ - def multifetch(fetches: FetchRequest*): MultiFetchResponse = { - lock synchronized { - val startTime = SystemTime.nanoseconds - getOrMakeConnection() - var response: Tuple2[Receive,Int] = null - try { - sendRequest(new MultiFetchRequest(fetches.toArray)) - response = getResponse - } catch { - case e : java.io.IOException => - info("Reconnect in multifetch due to socket error: ", e) - // retry once - try { - channel = connect - sendRequest(new MultiFetchRequest(fetches.toArray)) - response = getResponse - }catch { - case ioe: java.io.IOException => channel = null; throw ioe; - } - case e => throw e - } val endTime = SystemTime.nanoseconds SimpleConsumerStats.recordFetchRequest(endTime - startTime) - SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit) + SimpleConsumerStats.recordConsumptionThroughput(fetchedSize) - // error code will be set on individual messageset inside MultiFetchResponse - new MultiFetchResponse(response._1.buffer, fetches.length, fetches.toArray.map(f => f.offset)) + fetchResponse } } diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala index 51bf516..e92f2c3 100644 --- a/core/src/main/scala/kafka/consumer/TopicCount.scala +++ b/core/src/main/scala/kafka/consumer/TopicCount.scala @@ -32,8 +32,7 @@ private[kafka] object TopicCount extends Logging { case Some(m) => topMap = m.asInstanceOf[Map[String,Int]] case None => throw new RuntimeException("error constructing TopicCount : " + jsonString) } - } - catch { + } catch { case e => error("error parsing consumer json string " + jsonString, e) throw e @@ -46,8 +45,7 @@ private[kafka] object TopicCount extends Logging { private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) { - def getConsumerThreadIdsPerTopic() - : Map[String, Set[String]] = { + def getConsumerThreadIdsPerTopic(): Map[String, Set[String]] = { val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[String]]() for ((topic, nConsumers) <- topicCountMap) { val consumerSet = new mutable.HashSet[String] diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 221d2a5..83c7bf5 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -105,8 +105,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, def this(config: ConsumerConfig) = this(config, true) - def createMessageStreams[T](topicCountMap: Map[String,Int], - decoder: Decoder[T]) + def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T]) : Map[String,List[KafkaMessageStream[T]]] = { consume(topicCountMap, decoder) } @@ -138,8 +137,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, zkClient.close() zkClient = null } - } - catch { + } catch { case e => fatal("error during consumer connector shutdown", e) } @@ -147,8 +145,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } } - def consume[T](topicCountMap: scala.collection.Map[String,Int], - decoder: Decoder[T]) + def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T]) : Map[String,List[KafkaMessageStream[T]]] = { debug("entering consume ") if (topicCountMap == null) @@ -159,13 +156,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, var consumerUuid : String = null config.consumerId match { - case Some(consumerId) // for testing only - => consumerUuid = consumerId - case None // generate unique consumerId automatically - => val uuid = UUID.randomUUID() - consumerUuid = "%s-%d-%s".format( - InetAddress.getLocalHost.getHostName, System.currentTimeMillis, - uuid.getMostSignificantBits().toHexString.substring(0,8)) + case Some(consumerId) => // for testing only + consumerUuid = consumerId + case None => // generate unique consumerId automatically + val uuid = UUID.randomUUID() + consumerUuid = "%s-%d-%s".format( InetAddress.getLocalHost.getHostName, + System.currentTimeMillis, + uuid.getMostSignificantBits().toHexString.substring(0,8) ) } val consumerIdString = config.groupId + "_" + consumerUuid val topicCount = new TopicCount(consumerIdString, topicCountMap) @@ -243,8 +240,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, try { updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name, newOffset.toString) - } - catch { + } catch { case t: Throwable => // log it and let it go warn("exception during commitOffsets", t) @@ -321,8 +317,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, ConsumerConfig.SocketBufferSize) val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1) producedOffset = offsets(0) - } - catch { + } catch { case e => error("error in earliestOrLatestOffset() ", e) } @@ -419,8 +414,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val cluster = getCluster(zkClient) try { done = rebalance(cluster) - } - catch { + } catch { case e => /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. * For example, a ZK node can disappear between the time we get all children and the time we try to get @@ -433,7 +427,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, info("end rebalancing consumer " + consumerIdString + " try #" + i) if (done) { return - }else { + } else { /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should * clear the cache */ info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") @@ -529,7 +523,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, oldConsumersPerTopicMap = consumersPerTopicMap updateFetcher(cluster, kafkaMessageStreams) true - }else + } else false } @@ -611,8 +605,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId) info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic) true - } - catch { + } catch { case e: ZkNodeExistsException => // The node hasn't been deleted by the original owner. So wait a bit and retry. info("waiting for the partition ownership to be deleted: " + partition) diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala index 20ca193..30d44fb 100644 --- a/core/src/main/scala/kafka/javaapi/Implicits.scala +++ b/core/src/main/scala/kafka/javaapi/Implicits.scala @@ -115,9 +115,6 @@ private[javaapi] object Implicits extends Logging { } } - implicit def toMultiFetchResponse(response: kafka.javaapi.MultiFetchResponse): kafka.api.MultiFetchResponse = - response.underlying - - implicit def toJavaMultiFetchResponse(response: kafka.api.MultiFetchResponse): kafka.javaapi.MultiFetchResponse = - new kafka.javaapi.MultiFetchResponse(response.buffer, response.numSets, response.offsets) + implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse = + new kafka.javaapi.FetchResponse(response.correlationId, response.versionId, response.error, response.data) } diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala index 9ba324d..c1b9fb9 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala @@ -17,10 +17,9 @@ package kafka.javaapi.consumer -import kafka.utils.threadsafe -import kafka.javaapi.message.ByteBufferMessageSet -import kafka.javaapi.MultiFetchResponse import kafka.api.FetchRequest +import kafka.javaapi.FetchResponse +import kafka.utils.threadsafe /** * A consumer of kafka messages @@ -38,24 +37,12 @@ class SimpleConsumer(val host: String, * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. * @return a set of fetched messages */ - def fetch(request: FetchRequest): ByteBufferMessageSet = { + def fetch(request: FetchRequest): FetchResponse = { import kafka.javaapi.Implicits._ underlying.fetch(request) } /** - * Combine multiple fetch requests in one call. - * - * @param fetches a sequence of fetch requests. - * @return a sequence of fetch responses - */ - def multifetch(fetches: java.util.List[FetchRequest]): MultiFetchResponse = { - import scala.collection.JavaConversions._ - import kafka.javaapi.Implicits._ - underlying.multifetch(asBuffer(fetches): _*) - } - - /** * Get a list of valid offsets (up to maxSize) before the given time. * The result is a list of offsets, in descending order. * diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala index 08f454c..cab1864 100644 --- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala @@ -28,7 +28,7 @@ import kafka.utils._ @nonthreadsafe private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive with Logging { - private val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4) + private val sizeBuffer = ByteBuffer.allocate(4) private var contentBuffer: ByteBuffer = null def this() = this(Int.MaxValue) @@ -78,12 +78,10 @@ private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive var buffer: ByteBuffer = null try { buffer = ByteBuffer.allocate(size) - } - catch { - case e: OutOfMemoryError => { + } catch { + case e: OutOfMemoryError => error("OOME with size " + size, e) throw e - } case e2 => throw e2 } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index d715b03..a36e21d 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -21,8 +21,8 @@ import java.util.concurrent._ object RequestChannel { val AllDone = new Request(1, 2, null, 0) - case class Request(val processor: Int, requestKey: Any, request: Receive, start: Long) - case class Response(val processor: Int, requestKey: Any, response: Send, start: Long, ellapsed: Long) + case class Request(processor: Int, requestKey: Any, request: Receive, start: Long) + case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsed: Long) } class RequestChannel(val numProcessors: Int, val queueSize: Int) { diff --git a/core/src/main/scala/kafka/network/SocketServerStats.scala b/core/src/main/scala/kafka/network/SocketServerStats.scala index 2ec1fa9..7bbf7d2 100644 --- a/core/src/main/scala/kafka/network/SocketServerStats.scala +++ b/core/src/main/scala/kafka/network/SocketServerStats.scala @@ -50,7 +50,7 @@ class SocketServerStats(val monitorDurationNs: Long, val time: Time) extends Soc requestTypeId match { case r if r == RequestKeys.Produce || r == RequestKeys.MultiProduce => produceTimeStats.recordRequestMetric(durationNs) - case r if r == RequestKeys.Fetch || r == RequestKeys.MultiFetch => + case r if r == RequestKeys.Fetch => fetchTimeStats.recordRequestMetric(durationNs) case _ => /* not collecting; let go */ } diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala index 1b0d928..457a983 100644 --- a/core/src/main/scala/kafka/network/Transmission.scala +++ b/core/src/main/scala/kafka/network/Transmission.scala @@ -66,14 +66,15 @@ trait Receive extends Transmission { trait Send extends Transmission { def writeTo(channel: GatheringByteChannel): Int - + def writeCompletely(channel: GatheringByteChannel): Int = { - var written = 0 + var totalWritten = 0 while(!complete) { - written = writeTo(channel) + val written = writeTo(channel) trace(written + " bytes written.") + totalWritten += written } - written + totalWritten } } @@ -99,9 +100,9 @@ abstract class MultiSend[S <: Send](val sends: List[S]) extends Send { if (current == Nil) { if (totalWritten != expectedBytesToWrite) error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten) - return true + true + } else { + false } - else - return false } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c9c2325..2c1f144 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,17 +17,17 @@ package kafka.server -import org.apache.log4j.Logger -import kafka.log._ -import kafka.network._ -import kafka.message._ +import java.io.IOException +import java.lang.IllegalStateException +import kafka.admin.{CreateTopicCommand, AdminUtils} import kafka.api._ import kafka.common.ErrorMapping -import java.io.IOException +import kafka.log._ +import kafka.message._ +import kafka.network._ import kafka.utils.{SystemTime, Logging} -import collection.mutable.ListBuffer -import kafka.admin.{CreateTopicCommand, AdminUtils} -import java.lang.IllegalStateException +import org.apache.log4j.Logger +import scala.collection.mutable.ListBuffer /** * Logic to handle the various Kafka requests @@ -39,13 +39,12 @@ class KafkaApis(val logManager: LogManager) extends Logging { def handle(receive: Receive): Option[Send] = { val apiId = receive.buffer.getShort() apiId match { - case RequestKeys.Produce => handleProducerRequest(receive) - case RequestKeys.Fetch => handleFetchRequest(receive) - case RequestKeys.MultiFetch => handleMultiFetchRequest(receive) - case RequestKeys.MultiProduce => handleMultiProducerRequest(receive) - case RequestKeys.Offsets => handleOffsetRequest(receive) - case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive) - case _ => throw new IllegalStateException("No mapping found for handler id " + apiId) + case RequestKeys.Produce => handleProducerRequest(receive) + case RequestKeys.Fetch => handleFetchRequest(receive) + case RequestKeys.MultiProduce => handleMultiProducerRequest(receive) + case RequestKeys.Offsets => handleOffsetRequest(receive) + case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive) + case _ => throw new IllegalStateException("No mapping found for handler id " + apiId) } } @@ -92,34 +91,37 @@ class KafkaApis(val logManager: LogManager) extends Logging { val fetchRequest = FetchRequest.readFrom(request.buffer) if(requestLogger.isTraceEnabled) requestLogger.trace("Fetch request " + fetchRequest.toString) - Some(readMessageSet(fetchRequest)) - } - - def handleMultiFetchRequest(request: Receive): Option[Send] = { - val multiFetchRequest = MultiFetchRequest.readFrom(request.buffer) - if(requestLogger.isTraceEnabled) - requestLogger.trace("Multifetch request") - multiFetchRequest.fetches.foreach(req => requestLogger.trace(req.toString)) - var responses = multiFetchRequest.fetches.map(fetch => - readMessageSet(fetch)).toList - - Some(new MultiMessageSetSend(responses)) + + val fetchedData = new ListBuffer[TopicData]() + var error: Int = ErrorMapping.NoError + + for(offsetDetail <- fetchRequest.offsetInfo) { + val info = new ListBuffer[PartitionData]() + val topic = offsetDetail.topic + val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes) + for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { + val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match { + case Left(err) => error = err; new PartitionData(partition, err, offset, MessageSet.Empty) + case Right(messages) => new PartitionData(partition, ErrorMapping.NoError, offset, messages) + } + info.append(partitionInfo) + } + fetchedData.append(new TopicData(topic, info.toArray)) + } + val response = new FetchResponse(fetchRequest.correlationId, FetchRequest.CurrentVersion, error.shortValue(), fetchedData.toArray ) + Some(new FetchResponseSend(response)) } - private def readMessageSet(fetchRequest: FetchRequest): MessageSetSend = { - var response: MessageSetSend = null + private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Int, MessageSet] = { + var response: Either[Int, MessageSet] = null try { - trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition = " + fetchRequest.partition) - val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition) - if (log != null) - response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize)) - else - response = new MessageSetSend() - } - catch { + trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) + val log = logManager.getLog(topic, partition) + response = Right(if(log != null) log.read(offset, maxSize) else MessageSet.Empty) + } catch { case e => - error("error when processing request " + fetchRequest, e) - response=new MessageSetSend(MessageSet.Empty, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + error("error when processing request " + (topic, partition, offset, maxSize), e) + response = Left(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } response } diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 3da51f4..a72c8d2 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -35,9 +35,9 @@ class KafkaRequestHandler(val requestChannel: RequestChannel, val handle: (Recei case Some(send) => { val resp = new RequestChannel.Response(processor = req.processor, requestKey = req.requestKey, - response = send, - start = req.start, - ellapsed = -1) + response = send, + start = req.start, + elapsed = -1) requestChannel.sendResponse(resp) trace("Processor " + Thread.currentThread.getName + " sent response " + resp) } diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 74218ec..dc04180 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -19,7 +19,7 @@ package kafka.tools import java.net.URI import joptsimple._ -import kafka.api.FetchRequest +import kafka.api.FetchRequestBuilder import kafka.utils._ import kafka.consumer._ @@ -54,6 +54,11 @@ object SimpleConsumerShell extends Logging { .describedAs("fetchsize") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000000) + val clientIdOpt = parser.accepts("clientId", "The ID of this client.") + .withOptionalArg + .describedAs("clientId") + .ofType(classOf[String]) + .defaultsTo("SimpleConsumerShell") val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator") .withOptionalArg .describedAs("print offsets") @@ -79,7 +84,8 @@ object SimpleConsumerShell extends Logging { val topic = options.valueOf(topicOpt) val partition = options.valueOf(partitionOpt).intValue val startingOffset = options.valueOf(offsetOpt).longValue - val fetchsize = options.valueOf(fetchsizeOpt).intValue + val fetchSize = options.valueOf(fetchsizeOpt).intValue + val clientId = options.valueOf(clientIdOpt).toString val printOffsets = if(options.has(printOffsetOpt)) true else false val printMessages = if(options.has(printMessageOpt)) true else false @@ -87,22 +93,27 @@ object SimpleConsumerShell extends Logging { val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024) val thread = Utils.newThread("kafka-consumer", new Runnable() { def run() { + var reqId = 0 var offset = startingOffset while(true) { - val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize) - val messageSets = consumer.multifetch(fetchRequest) - for (messages <- messageSets) { - debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset) - var consumed = 0 - for(messageAndOffset <- messages) { - if(printMessages) - info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) - offset = messageAndOffset.offset - if(printOffsets) - info("next offset = " + offset) - consumed += 1 - } + val fetchRequest = new FetchRequestBuilder() + .correlationId(reqId) + .clientId(clientId) + .addFetch(topic, partition, offset, fetchSize) + .build() + val fetchResponse = consumer.fetch(fetchRequest) + val messageSet = fetchResponse.messageSet(topic, partition) + debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset) + var consumed = 0 + for(messageAndOffset <- messageSet) { + if(printMessages) + info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8")) + offset = messageAndOffset.offset + if(printOffsets) + info("next offset = " + offset) + consumed += 1 } + reqId += 1 } } }, false); diff --git a/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala b/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala index 789bd64..222cf0b 100644 --- a/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala +++ b/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala @@ -17,15 +17,17 @@ package kafka.integration -import kafka.server.KafkaConfig -import org.scalatest.junit.JUnit3Suite -import org.apache.log4j.Logger +import junit.framework.Assert._ import java.util.Properties + +import kafka.api.{FetchRequestBuilder, OffsetRequest} import kafka.consumer.SimpleConsumer -import kafka.api.{OffsetRequest, FetchRequest} -import junit.framework.Assert._ +import kafka.server.KafkaConfig import kafka.utils.TestUtils +import org.apache.log4j.Logger +import org.scalatest.junit.JUnit3Suite + class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness { val topic = "MagicByte0" @@ -62,9 +64,10 @@ class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness var messageCount: Int = 0 while(fetchOffset < lastOffset(0)) { - val fetched = simpleConsumer.fetch(new FetchRequest(topic, 0, fetchOffset, 10000)) - fetched.foreach(m => fetchOffset = m.offset) - messageCount += fetched.size + val fetched = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, fetchOffset, 10000).build()) + val fetchedMessages = fetched.messageSet(topic, 0) + fetchedMessages.foreach(m => fetchOffset = m.offset) + messageCount += fetchedMessages.size } assertEquals(100, messageCount) } diff --git a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala index 94fe9d5..284ac62 100644 --- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala +++ b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala @@ -17,15 +17,15 @@ package kafka.integration -import scala.collection._ +import kafka.api.{FetchRequestBuilder, ProducerRequest} import kafka.common.OffsetOutOfRangeException -import kafka.api.{ProducerRequest, FetchRequest} +import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig} +import kafka.utils.{TestUtils, Utils} +import kafka.zk.ZooKeeperTestHarness import org.apache.log4j.{Level, Logger} import org.scalatest.junit.JUnit3Suite -import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} -import kafka.zk.ZooKeeperTestHarness -import kafka.utils.{TestUtils, Utils} +import scala.collection._ /** * End to end tests of the primitive apis against a local server @@ -65,54 +65,60 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness new Message("hello".getBytes()), new Message("there".getBytes())) producer.send(topic, sent) sent.getBuffer.rewind - var fetched: ByteBufferMessageSet = null - while(fetched == null || fetched.validBytes == 0) - fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - TestUtils.checkEquals(sent.iterator, fetched.iterator) + + var fetchedMessage: ByteBufferMessageSet = null + while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + fetchedMessage = fetched.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet] + } + TestUtils.checkEquals(sent.iterator, fetchedMessage.iterator) // send an invalid offset try { - val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000)) - fetchedWithError.iterator + val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build()) + fetchedWithError.messageSet(topic, 0).iterator fail("Expected an OffsetOutOfRangeException exception to be thrown") - } - catch { + } catch { case e: OffsetOutOfRangeException => } } def testProduceAndMultiFetch() { - // send some messages - val topics = List("test1", "test2", "test3"); + // send some messages, with non-ordered topics + val topicOffsets = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, offset) <- topicOffsets) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) - messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches += new FetchRequest(topic, 0, 0, 10000) + messages += topic -> set + builder.addFetch(topic, offset, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, offset) <- topicOffsets) { + val fetched = response.messageSet(topic, offset) + TestUtils.checkEquals(messages(topic).iterator, fetched.iterator) + } } { // send some invalid offsets - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, 0, -1, 10000) + val builder = new FetchRequestBuilder() + for( (topic, offset) <- topicOffsets ) + builder.addFetch(topic, offset, -1, 10000) - val responses = consumer.multifetch(fetches: _*) - for(resp <- responses) { + val request = builder.build() + val responses = consumer.fetch(request) + for( (topic, offset) <- topicOffsets ) { try { - resp.iterator + responses.messageSet(topic, offset).iterator fail("Expected an OffsetOutOfRangeException exception to be thrown") } catch { case e: OffsetOutOfRangeException => @@ -125,14 +131,14 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness // send some messages val topics = List("test1", "test2", "test3"); val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] + val builder = new FetchRequestBuilder() var produceList: List[ProducerRequest] = Nil for(topic <- topics) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set produceList ::= new ProducerRequest(topic, 0, set) - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, 0, 0, 10000) } producer.multiSend(produceList.toArray) @@ -141,23 +147,26 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + val request = builder.build() + val response = consumer.fetch(request) + for(topic <- topics) { + val fetched = response.messageSet(topic, 0) + TestUtils.checkEquals(messages(topic).iterator, fetched.iterator) + } } def testMultiProduceResend() { // send some messages val topics = List("test1", "test2", "test3"); val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] + val builder = new FetchRequestBuilder() var produceList: List[ProducerRequest] = Nil for(topic <- topics) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set produceList ::= new ProducerRequest(topic, 0, set) - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, 0, 0, 10000) } producer.multiSend(produceList.toArray) @@ -169,11 +178,13 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness // wait a bit for produced message to be available Thread.sleep(750) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) + val request = builder.build() + val response = consumer.fetch(request) + for(topic <- topics) { + val topicMessages = response.messageSet(topic, 0) TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).map(m => m.message).iterator, messages(topic).map(m => m.message).iterator), - resp.map(m => m.message).iterator) -// TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, messages(topic).iterator), resp.iterator) + topicMessages.iterator.map(_.message)) + } } } diff --git a/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala b/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala index f7449e6..68f7c86 100644 --- a/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala @@ -20,16 +20,14 @@ package kafka.log import kafka.server.KafkaConfig import java.io.File import java.nio.ByteBuffer -import kafka.utils.Utils -import kafka.api.FetchRequest +import kafka.api.FetchRequestBuilder import kafka.common.InvalidMessageSizeException -import kafka.utils.TestUtils import kafka.consumer.{ZookeeperConsumerConnector, ConsumerConfig} +import kafka.integration.{KafkaServerTestHarness, ProducerConsumerTestHarness} +import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} +import kafka.utils.{Utils, TestUtils} import org.scalatest.junit.JUnit3Suite -import kafka.integration.ProducerConsumerTestHarness -import kafka.integration.KafkaServerTestHarness import org.apache.log4j.{Logger, Level} -import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness { val port = TestUtils.choosePort @@ -65,23 +63,21 @@ class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness wit Thread.sleep(500) // test SimpleConsumer - val messageSet = consumer.fetch(new FetchRequest(topic, partition, 0, 10000)) + val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, 10000).build()) try { - for (msg <- messageSet) + for (msg <- response.messageSet(topic, partition)) fail("shouldn't reach here in SimpleConsumer since log file is corrupted.") fail("shouldn't reach here in SimpleConsumer since log file is corrupted.") - } - catch { + } catch { case e: InvalidMessageSizeException => "This is good" } - val messageSet2 = consumer.fetch(new FetchRequest(topic, partition, 0, 10000)) + val response2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, 10000).build()) try { - for (msg <- messageSet2) + for (msg <- response2.messageSet(topic, partition)) fail("shouldn't reach here in SimpleConsumer since log file is corrupted.") fail("shouldn't reach here in SimpleConsumer since log file is corrupted.") - } - catch { + } catch { case e: InvalidMessageSizeException => println("This is good") } @@ -95,8 +91,7 @@ class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness wit for (message <- messageStreams(0)) fail("shouldn't reach here in ZookeeperConsumer since log file is corrupted.") fail("shouldn't reach here in ZookeeperConsumer since log file is corrupted.") - } - catch { + } catch { case e: InvalidMessageSizeException => "This is good" case e: Exception => "This is not bad too !" } diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 83ddfea..5819de0 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -18,18 +18,18 @@ package kafka.integration import scala.collection._ +import java.io.File +import java.util.Properties import junit.framework.Assert._ -import kafka.api.{ProducerRequest, FetchRequest} +import kafka.api.{FetchRequestBuilder, ProducerRequest} import kafka.common.{OffsetOutOfRangeException, InvalidPartitionException} -import kafka.server.{KafkaRequestHandler, KafkaConfig} -import org.apache.log4j.{Level, Logger} -import org.scalatest.junit.JUnit3Suite -import java.util.Properties +import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet} import kafka.producer.{ProducerData, Producer, ProducerConfig} import kafka.serializer.StringDecoder -import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet} -import java.io.File +import kafka.server.{KafkaRequestHandler, KafkaConfig} import kafka.utils.TestUtils +import org.apache.log4j.{Level, Logger} +import org.scalatest.junit.JUnit3Suite /** * End to end tests of the primitive apis against a local server @@ -55,10 +55,11 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test-message"))) Thread.sleep(200) - var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - assertTrue(fetched.iterator.hasNext) + val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val messageSet = fetched.messageSet(topic, 0) + assertTrue(messageSet.iterator.hasNext) - val fetchedMessageAndOffset = fetched.iterator.next + val fetchedMessageAndOffset = messageSet.head val stringDecoder = new StringDecoder val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message) assertEquals("test-message", fetchedStringMessage) @@ -76,10 +77,11 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test-message"))) Thread.sleep(200) - var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - assertTrue(fetched.iterator.hasNext) + var fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val messageSet = fetched.messageSet(topic, 0) + assertTrue(messageSet.iterator.hasNext) - val fetchedMessageAndOffset = fetched.iterator.next + val fetchedMessageAndOffset = messageSet.head val stringDecoder = new StringDecoder val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message) assertEquals("test-message", fetchedStringMessage) @@ -87,24 +89,27 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testProduceAndMultiFetch() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, partition, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(700) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val fetched = response.messageSet(topic, partition) + TestUtils.checkEquals(messages(topic).iterator, fetched.iterator) + } } // temporarily set request handler logger to a higher level @@ -112,34 +117,34 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with { // send some invalid offsets - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, 0, -1, 10000) + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) + builder.addFetch(topic, partition, -1, 10000) try { - val responses = consumer.multifetch(fetches: _*) - for(resp <- responses) - resp.iterator - fail("expect exception") - } - catch { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) + response.messageSet(topic, partition).iterator + fail("Expected exception when fetching message with invalid offset") + } catch { case e: OffsetOutOfRangeException => "this is good" } } { // send some invalid partitions - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, -1, 0, 10000) + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) + builder.addFetch(topic, -1, 0, 10000) try { - val responses = consumer.multifetch(fetches: _*) - for(resp <- responses) - resp.iterator - fail("expect exception") - } - catch { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) + response.messageSet(topic, -1).iterator + fail("Expected exception when fetching message with invalid partition") + } catch { case e: InvalidPartitionException => "this is good" } } @@ -150,24 +155,27 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testProduceAndMultiFetchWithCompression() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(DefaultCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, partition, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val fetched = response.messageSet(topic, partition) + TestUtils.checkEquals(messages(topic).iterator, fetched.iterator) + } } // temporarily set request handler logger to a higher level @@ -175,34 +183,34 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with { // send some invalid offsets - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, 0, -1, 10000) + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) + builder.addFetch(topic, partition, -1, 10000) try { - val responses = consumer.multifetch(fetches: _*) - for(resp <- responses) - resp.iterator - fail("expect exception") - } - catch { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) + response.messageSet(topic, partition).iterator + fail("Expected exception when fetching message with invalid offset") + } catch { case e: OffsetOutOfRangeException => "this is good" } } { // send some invalid partitions - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, -1, 0, 10000) + val builder = new FetchRequestBuilder() + for( (topic, _) <- topics) + builder.addFetch(topic, -1, 0, 10000) try { - val responses = consumer.multifetch(fetches: _*) - for(resp <- responses) - resp.iterator - fail("expect exception") - } - catch { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, _) <- topics) + response.messageSet(topic, -1).iterator + fail("Expected exception when fetching message with invalid partition") + } catch { case e: InvalidPartitionException => "this is good" } } @@ -213,16 +221,16 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testMultiProduce() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] + val builder = new FetchRequestBuilder() var produceList: List[ProducerRequest] = Nil - for(topic <- topics) { + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set produceList ::= new ProducerRequest(topic, 0, set) - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, partition, 0, 10000) } producer.multiSend(produceList.toArray) @@ -231,23 +239,26 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val fetched = response.messageSet(topic, partition) + TestUtils.checkEquals(messages(topic).iterator, fetched.iterator) + } } def testMultiProduceWithCompression() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] + val builder = new FetchRequestBuilder() var produceList: List[ProducerRequest] = Nil - for(topic <- topics) { + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(DefaultCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set produceList ::= new ProducerRequest(topic, 0, set) - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, partition, 0, 10000) } producer.multiSend(produceList.toArray) @@ -256,15 +267,18 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches: _*) - for((topic, resp) <- topics.zip(response.toList)) - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val fetched = response.messageSet(topic, 0) + TestUtils.checkEquals(messages(topic).iterator, fetched.iterator) + } } def testConsumerNotExistTopic() { val newTopic = "new-topic" - val messageSetIter = consumer.fetch(new FetchRequest(newTopic, 0, 0, 10000)).iterator - assertTrue(messageSetIter.hasNext == false) + val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build()) + assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) val logFile = new File(config.logDir, newTopic + "-0") assertTrue(!logFile.exists) } diff --git a/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala index e423ced..ad26120 100644 --- a/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala @@ -18,15 +18,15 @@ package kafka.javaapi.integration import scala.collection._ -import kafka.api.FetchRequest +import kafka.api.FetchRequestBuilder import kafka.common.{InvalidPartitionException, OffsetOutOfRangeException} -import kafka.server.{KafkaRequestHandler, KafkaConfig} -import org.apache.log4j.{Level, Logger} -import org.scalatest.junit.JUnit3Suite -import kafka.javaapi.message.ByteBufferMessageSet import kafka.javaapi.ProducerRequest +import kafka.javaapi.message.ByteBufferMessageSet import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message} +import kafka.server.{KafkaRequestHandler, KafkaConfig} import kafka.utils.TestUtils +import org.apache.log4j.{Level, Logger} +import org.scalatest.junit.JUnit3Suite /** * End to end tests of the primitive apis against a local server @@ -43,39 +43,42 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with // send some messages val topic = "test" -// send an empty messageset first - val sent2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = getMessageList(Seq.empty[Message]: _*)) + // send an empty messageset first + val sent2 = new ByteBufferMessageSet(NoCompressionCodec, getMessageList(Seq.empty[Message]: _*)) producer.send(topic, sent2) + Thread.sleep(200) sent2.getBuffer.rewind - var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - TestUtils.checkEquals(sent2.iterator, fetched2.iterator) + val fetched2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val fetchedMessage2 = fetched2.messageSet(topic, 0) + TestUtils.checkEquals(sent2.iterator, fetchedMessage2.iterator) // send some messages - val sent3 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = getMessageList(new Message("hello".getBytes()), - new Message("there".getBytes()))) + val sent3 = new ByteBufferMessageSet(NoCompressionCodec, + getMessageList( + new Message("hello".getBytes()),new Message("there".getBytes()))) producer.send(topic, sent3) Thread.sleep(200) sent3.getBuffer.rewind - var fetched3: ByteBufferMessageSet = null - while(fetched3 == null || fetched3.validBytes == 0) - fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - TestUtils.checkEquals(sent3.iterator, fetched3.iterator) + var messageSet: ByteBufferMessageSet = null + while(messageSet == null || messageSet.validBytes == 0) { + val fetched3 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + messageSet = fetched3.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet] + } + TestUtils.checkEquals(sent3.iterator, messageSet.iterator) // temporarily set request handler logger to a higher level requestHandlerLogger.setLevel(Level.FATAL) // send an invalid offset try { - val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000)) - fetchedWithError.iterator - fail("expect exception") - } - catch { + val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build()) + val messageWithError = fetchedWithError.messageSet(topic, 0) + messageWithError.iterator + fail("Fetch with invalid offset should throw an exception when iterating over response") + } catch { case e: OffsetOutOfRangeException => "this is good" } @@ -87,39 +90,42 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with // send some messages val topic = "test" -// send an empty messageset first - val sent2 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, - messages = getMessageList(Seq.empty[Message]: _*)) + // send an empty messageset first + val sent2 = new ByteBufferMessageSet(DefaultCompressionCodec, getMessageList(Seq.empty[Message]: _*)) producer.send(topic, sent2) + Thread.sleep(200) sent2.getBuffer.rewind - var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - TestUtils.checkEquals(sent2.iterator, fetched2.iterator) + val fetched2 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + val message2 = fetched2.messageSet(topic, 0) + TestUtils.checkEquals(sent2.iterator, message2.iterator) // send some messages - val sent3 = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, - messages = getMessageList(new Message("hello".getBytes()), - new Message("there".getBytes()))) + val sent3 = new ByteBufferMessageSet( DefaultCompressionCodec, + getMessageList( + new Message("hello".getBytes()),new Message("there".getBytes()))) producer.send(topic, sent3) Thread.sleep(200) sent3.getBuffer.rewind - var fetched3: ByteBufferMessageSet = null - while(fetched3 == null || fetched3.validBytes == 0) - fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - TestUtils.checkEquals(sent3.iterator, fetched3.iterator) + var fetchedMessage: ByteBufferMessageSet = null + while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + val fetched3 = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + fetchedMessage = fetched3.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet] + } + TestUtils.checkEquals(sent3.iterator, fetchedMessage.iterator) // temporarily set request handler logger to a higher level requestHandlerLogger.setLevel(Level.FATAL) // send an invalid offset try { - val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000)) - fetchedWithError.iterator - fail("expect exception") - } - catch { + val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build()) + val messageWithError = fetchedWithError.messageSet(topic, 0) + messageWithError.iterator + fail("Fetch with invalid offset should throw an exception when iterating over response") + } catch { case e: OffsetOutOfRangeException => "this is good" } @@ -129,31 +135,27 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testProduceAndMultiFetch() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, partition, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = response.iterator - for(topic <- topics) { - if (iter.hasNext) { - val resp = iter.next - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) - } - else - fail("fewer responses than expected") + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val messageSet = response.messageSet(topic, partition) + TestUtils.checkEquals(messages(topic).iterator, messageSet.iterator) } } @@ -162,37 +164,41 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with { // send some invalid offsets - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, 0, -1, 10000) - - try { - val responses = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = responses.iterator - while (iter.hasNext) - iter.next.iterator - fail("expect exception") - } - catch { - case e: OffsetOutOfRangeException => "this is good" + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) + builder.addFetch(topic, partition, -1, 10000) + + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + try { + val iter = response.messageSet(topic, partition).iterator + while (iter.hasNext) + iter.next + fail("MessageSet for invalid offset should throw exception") + } catch { + case e: OffsetOutOfRangeException => "this is good" + } } } { // send some invalid partitions - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, -1, 0, 10000) - - try { - val responses = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = responses.iterator - while (iter.hasNext) - iter.next.iterator - fail("expect exception") - } - catch { - case e: InvalidPartitionException => "this is good" + val builder = new FetchRequestBuilder() + for( (topic, _) <- topics) + builder.addFetch(topic, -1, 0, 10000) + + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, _) <- topics) { + try { + val iter = response.messageSet(topic, -1).iterator + while (iter.hasNext) + iter.next + fail("MessageSet for invalid partition should throw exception") + } catch { + case e: InvalidPartitionException => "this is good" + } } } @@ -202,31 +208,31 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testProduceAndMultiFetchWithCompression() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, partition, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = response.iterator - for(topic <- topics) { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val iter = response.messageSet(topic, partition).iterator if (iter.hasNext) { - val resp = iter.next - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) - } - else + TestUtils.checkEquals(messages(topic).iterator, iter) + } else { fail("fewer responses than expected") + } } } @@ -235,37 +241,41 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with { // send some invalid offsets - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, 0, -1, 10000) - - try { - val responses = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = responses.iterator - while (iter.hasNext) - iter.next.iterator - fail("expect exception") - } - catch { - case e: OffsetOutOfRangeException => "this is good" + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) + builder.addFetch(topic, partition, -1, 10000) + + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + try { + val iter = response.messageSet(topic, partition).iterator + while (iter.hasNext) + iter.next + fail("Expected exception when fetching invalid offset") + } catch { + case e: OffsetOutOfRangeException => "this is good" + } } } { // send some invalid partitions - val fetches = new mutable.ArrayBuffer[FetchRequest] - for(topic <- topics) - fetches += new FetchRequest(topic, -1, 0, 10000) - - try { - val responses = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = responses.iterator - while (iter.hasNext) - iter.next.iterator - fail("expect exception") - } - catch { - case e: InvalidPartitionException => "this is good" + val builder = new FetchRequestBuilder() + for( (topic, _) <- topics) + builder.addFetch(topic, -1, 0, 10000) + + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, _) <- topics) { + try { + val iter = response.messageSet(topic, -1).iterator + while (iter.hasNext) + iter.next + fail("Expected exception when fetching invalid partition") + } catch { + case e: InvalidPartitionException => "this is good" + } } } @@ -275,79 +285,75 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testProduceAndMultiFetchJava() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches.add(new FetchRequest(topic, 0, 0, 10000)) + builder.addFetch(topic, partition, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches) - val iter = response.iterator - for(topic <- topics) { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val iter = response.messageSet(topic, partition).iterator if (iter.hasNext) { - val resp = iter.next - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) - } - else + TestUtils.checkEquals(messages(topic).iterator, iter) + } else { fail("fewer responses than expected") + } } } } def testProduceAndMultiFetchJavaWithCompression() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); { val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches : java.util.ArrayList[FetchRequest] = new java.util.ArrayList[FetchRequest] - for(topic <- topics) { + val builder = new FetchRequestBuilder() + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set producer.send(topic, set) set.getBuffer.rewind - fetches.add(new FetchRequest(topic, 0, 0, 10000)) + builder.addFetch(topic, partition, 0, 10000) } // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(fetches) - val iter = response.iterator - for(topic <- topics) { - if (iter.hasNext) { - val resp = iter.next - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) - } - else - fail("fewer responses than expected") + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val iter = response.messageSet(topic, partition).iterator + TestUtils.checkEquals(messages(topic).iterator, iter) } } } def testMultiProduce() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] + val builder = new FetchRequestBuilder() var produceList: List[ProducerRequest] = Nil - for(topic <- topics) { + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set produceList ::= new ProducerRequest(topic, 0, set) - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, partition, 0, 10000) } producer.multiSend(produceList.toArray) @@ -356,31 +362,31 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = response.iterator - for(topic <- topics) { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val iter = response.messageSet(topic, partition).iterator if (iter.hasNext) { - val resp = iter.next - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) - } - else + TestUtils.checkEquals(messages(topic).iterator, iter) + } else { fail("fewer responses than expected") + } } } def testMultiProduceWithCompression() { // send some messages - val topics = List("test1", "test2", "test3"); + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, ByteBufferMessageSet] - val fetches = new mutable.ArrayBuffer[FetchRequest] + val builder = new FetchRequestBuilder() var produceList: List[ProducerRequest] = Nil - for(topic <- topics) { + for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set produceList ::= new ProducerRequest(topic, 0, set) - fetches += new FetchRequest(topic, 0, 0, 10000) + builder.addFetch(topic, partition, 0, 10000) } producer.multiSend(produceList.toArray) @@ -389,15 +395,15 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with // wait a bit for produced message to be available Thread.sleep(200) - val response = consumer.multifetch(getFetchRequestList(fetches: _*)) - val iter = response.iterator - for(topic <- topics) { + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val iter = response.messageSet(topic, partition).iterator if (iter.hasNext) { - val resp = iter.next - TestUtils.checkEquals(messages(topic).iterator, resp.iterator) - } - else + TestUtils.checkEquals(messages(topic).iterator, iter) + } else { fail("fewer responses than expected") + } } } @@ -406,10 +412,4 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with messages.foreach(m => messageList.add(m)) messageList } - - private def getFetchRequestList(fetches: FetchRequest*): java.util.List[FetchRequest] = { - val fetchReqs = new java.util.ArrayList[FetchRequest]() - fetches.foreach(f => fetchReqs.add(f)) - fetchReqs - } } diff --git a/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala index f2f43c3..5a8cc8f 100644 --- a/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala @@ -36,8 +36,8 @@ import kafka.producer.async.AsyncProducer import kafka.javaapi.Implicits._ import kafka.serializer.{StringEncoder, Encoder} import kafka.javaapi.consumer.SimpleConsumer -import kafka.api.FetchRequest import kafka.message.{NoCompressionCodec, Message} +import kafka.api.FetchRequestBuilder class ProducerTest extends JUnitSuite { @@ -379,8 +379,8 @@ class ProducerTest extends JUnitSuite { asyncProducers) val producer = new Producer[String, String](config, partitioner, producerPool, false) - val messagesContent = new java.util.ArrayList[String] - messagesContent.add("test1") + val messagesContent = new java.util.ArrayList[String]{{ add("test1"); }} +// messagesContent.add("test1") producer.send(new ProducerData[String, String](topic, "test1", messagesContent)) producer.close @@ -405,12 +405,14 @@ class ProducerTest extends JUnitSuite { producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1")))) Thread.sleep(100) // cross check if brokers got the messages - val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message) - val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message) + val fetchResponse1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet1 = fetchResponse1.messageSet("new-topic", 0) + Assert.assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext) + Assert.assertEquals(new Message("test1".getBytes), messageSet1.iterator.next.message) + val fetchRespone2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet2 = fetchRespone2.messageSet("new-topic", 0) + Assert.assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext) + Assert.assertEquals(new Message("test1".getBytes), messageSet2.iterator.next.message) } catch { case e: Exception => fail("Not expected") } @@ -438,11 +440,12 @@ class ProducerTest extends JUnitSuite { producer.send(new ProducerData[String, String]("new-topic", "test", asList(Array("test1")))) Thread.sleep(100) // cross check if brokers got the messages - val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message) - Assert.assertTrue("Message set should have another message", messageSet1.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message) + val fetchResponse1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageIter = fetchResponse1.messageSet("new-topic", 0).iterator + Assert.assertTrue("Message set should have 1 message", messageIter.hasNext) + Assert.assertEquals(new Message("test1".getBytes), messageIter.next.message) + Assert.assertTrue("Message set should have another message", messageIter.hasNext) + Assert.assertEquals(new Message("test1".getBytes), messageIter.next.message) } catch { case e: Exception => fail("Not expected") } diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala index 47b5bcf..6bc92b9 100644 --- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala @@ -22,7 +22,6 @@ import kafka.utils._ import kafka.server.{KafkaConfig, KafkaServer} import junit.framework.Assert._ import java.util.{Random, Properties} -import kafka.api.{FetchRequest, OffsetRequest} import collection.mutable.WrappedArray import kafka.consumer.SimpleConsumer import org.junit.{After, Before, Test} @@ -30,6 +29,7 @@ import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import org.apache.log4j._ import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite +import kafka.api.{FetchRequestBuilder, OffsetRequest} object LogOffsetTest { val random = new Random() @@ -66,9 +66,8 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { @Test def testEmptyLogs() { - val messageSet: ByteBufferMessageSet = simpleConsumer.fetch( - new FetchRequest("test", 0, 0, 300 * 1024)) - assertFalse(messageSet.iterator.hasNext) + val fetchResponse = simpleConsumer.fetch(new FetchRequestBuilder().addFetch("test", 0, 0, 300 * 1024).build()) + assertFalse(fetchResponse.messageSet("test", 0).iterator.hasNext) val name = "test" val logFile = new File(logDir, name + "-0") @@ -119,9 +118,9 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long])) // try to fetch using latest offset - val messageSet: ByteBufferMessageSet = simpleConsumer.fetch( - new FetchRequest(topic, 0, consumerOffsets.head, 300 * 1024)) - assertFalse(messageSet.iterator.hasNext) + val fetchResponse = simpleConsumer.fetch( + new FetchRequestBuilder().addFetch(topic, 0, consumerOffsets.head, 300 * 1024).build()) + assertFalse(fetchResponse.messageSet(topic, 0).iterator.hasNext) } @Test diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index b67877a..290caf9 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -17,23 +17,21 @@ package kafka.log4j -import org.apache.log4j.spi.LoggingEvent -import org.apache.log4j.{PropertyConfigurator, Logger} import java.util.Properties import java.io.File -import kafka.consumer.SimpleConsumer -import kafka.server.{KafkaConfig, KafkaServer} -import kafka.utils.TestZKUtils -import kafka.zk.EmbeddedZookeeper import junit.framework.Assert._ -import kafka.api.FetchRequest -import kafka.serializer.Encoder +import kafka.api.FetchRequestBuilder +import kafka.consumer.SimpleConsumer import kafka.message.Message import kafka.producer.async.MissingConfigException +import kafka.serializer.Encoder +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{TestUtils, TestZKUtils, Utils, Logging} +import kafka.zk.{EmbeddedZookeeper, ZooKeeperTestHarness} +import org.apache.log4j.spi.LoggingEvent +import org.apache.log4j.{PropertyConfigurator, Logger} import org.junit.{After, Before, Test} import org.scalatest.junit.JUnit3Suite -import kafka.zk.ZooKeeperTestHarness -import kafka.utils.{TestUtils, Utils, Logging} class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @@ -172,10 +170,10 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Thread.sleep(2500) var offset = 0L - val messages = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, offset, 1024*1024)) - + val response = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, offset, 1024*1024).build()) + val fetchedMessage = response.messageSet("test-topic", 0) var count = 0 - for(message <- messages) { + for(message <- fetchedMessage) { count = count + 1 offset += message.offset } @@ -192,14 +190,16 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Thread.sleep(500) - val messages = simpleConsumerZk.fetch(new FetchRequest("test-topic", 0, 0L, 1024*1024)) + val response = simpleConsumerZk.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build()) + val fetchMessage = response.messageSet("test-topic", 0) var count = 0 - for(message <- messages) { + for(message <- fetchMessage) { count = count + 1 } - val messagesFromOtherBroker = simpleConsumerBl.fetch(new FetchRequest("test-topic", 0, 0L, 1024*1024)) + val response2 = simpleConsumerBl.fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 1024*1024).build()) + val messagesFromOtherBroker = response2.messageSet("test-topic", 0) for(message <- messagesFromOtherBroker) { count = count + 1 diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 46af07f..7afed69 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -32,8 +32,8 @@ import kafka.common.{InvalidConfigException, UnavailableProducerException, Inval import kafka.utils.{TestUtils, TestZKUtils, Utils} import kafka.serializer.{StringEncoder, Encoder} import kafka.consumer.SimpleConsumer -import kafka.api.FetchRequest import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} +import kafka.api.{FetchRequestBuilder, FetchRequest} class ProducerTest extends JUnitSuite { private val topic = "test-topic" @@ -411,12 +411,14 @@ class ProducerTest extends JUnitSuite { producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) Thread.sleep(100) // cross check if brokers got the messages - val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message) - val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message) + val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet1 = response1.messageSet("new-topic", 0) + Assert.assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext) + Assert.assertEquals(new Message("test1".getBytes), messageSet1.head.message) + val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet2 = response2.messageSet("new-topic", 0) + Assert.assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext) + Assert.assertEquals(new Message("test1".getBytes), messageSet2.head.message) } catch { case e: Exception => fail("Not expected", e) } @@ -448,11 +450,12 @@ class ProducerTest extends JUnitSuite { producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1"))) Thread.sleep(100) // cross check if brokers got the messages - val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message) - Assert.assertTrue("Message set should have another message", messageSet1.hasNext) - Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message) + val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet1Iter = response1.messageSet("new-topic", 0).iterator + Assert.assertTrue("Message set should have 1 message", messageSet1Iter.hasNext) + Assert.assertEquals(new Message("test1".getBytes), messageSet1Iter.next.message) + Assert.assertTrue("Message set should have another message", messageSet1Iter.hasNext) + Assert.assertEquals(new Message("test1".getBytes), messageSet1Iter.next.message) } catch { case e: Exception => fail("Not expected") } @@ -481,9 +484,10 @@ class ProducerTest extends JUnitSuite { producer.send(new ProducerData[String, String]("new-topic", "test", Array("test"))) Thread.sleep(100) // cross check if brokers got the messages - val messageSet1 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext) - Assert.assertEquals(new Message("test".getBytes), messageSet1.next.message) + val response1 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet1 = response1.messageSet("new-topic", 0) + Assert.assertTrue("Message set should have 1 message", messageSet1.iterator.hasNext) + Assert.assertEquals(new Message("test".getBytes), messageSet1.head.message) // shutdown server2 server2.shutdown @@ -504,9 +508,10 @@ class ProducerTest extends JUnitSuite { Thread.sleep(100) // cross check if brokers got the messages - val messageSet2 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator - Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext) - Assert.assertEquals(new Message("test".getBytes), messageSet2.next.message) + val response2 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build()) + val messageSet2 = response2.messageSet("new-topic", 0) + Assert.assertTrue("Message set should have 1 message", messageSet2.iterator.hasNext) + Assert.assertEquals(new Message("test".getBytes), messageSet2.head.message) } catch { case e: Exception => fail("Not expected", e) diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 6dbaba1..5883c41 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -17,7 +17,6 @@ package kafka.server import java.io.File -import kafka.api.FetchRequest import kafka.producer.{SyncProducer, SyncProducerConfig} import kafka.consumer.SimpleConsumer import java.util.Properties @@ -27,6 +26,7 @@ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} import org.scalatest.junit.JUnit3Suite import kafka.zk.ZooKeeperTestHarness import kafka.utils.{TestUtils, Utils} +import kafka.api.{FetchResponse, FetchRequestBuilder, FetchRequest} class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { val port = TestUtils.choosePort @@ -82,11 +82,13 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { server.startup() // bring the server back again and read the messages - var fetched: ByteBufferMessageSet = null - while(fetched == null || fetched.validBytes == 0) - fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000)) - TestUtils.checkEquals(sent1.iterator, fetched.iterator) - val newOffset = fetched.validBytes + var fetchedMessage: ByteBufferMessageSet = null + while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build()) + fetchedMessage = fetched.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet] + } + TestUtils.checkEquals(sent1.iterator, fetchedMessage.iterator) + val newOffset = fetchedMessage.validBytes // send some more messages producer.send(topic, sent2) @@ -94,10 +96,12 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { Thread.sleep(200) - fetched = null - while(fetched == null || fetched.validBytes == 0) - fetched = consumer.fetch(new FetchRequest(topic, 0, newOffset, 10000)) - TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetched.map(m => m.message).iterator) + fetchedMessage = null + while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build()) + fetchedMessage = fetched.messageSet(topic, 0).asInstanceOf[ByteBufferMessageSet] + } + TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetchedMessage.map(m => m.message).iterator) server.shutdown() Utils.rm(server.config.logDir) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index b72d0d9..84a9fd0 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -184,23 +184,21 @@ object TestUtils { length += 1 assertEquals(expected.next, actual.next) } - - if (expected.hasNext) - { + + // check if the expected iterator is longer + if (expected.hasNext) { var length1 = length; - while (expected.hasNext) - { + while (expected.hasNext) { expected.next length1 += 1 } assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true); } - - if (actual.hasNext) - { + + // check if the actual iterator was longer + if (actual.hasNext) { var length2 = length; - while (actual.hasNext) - { + while (actual.hasNext) { actual.next length2 += 1 } diff --git a/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala b/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala index d53d1ad..374182c 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala @@ -77,7 +77,7 @@ class ZKLoadBalanceTest extends JUnit3Suite with ZooKeeperTestHarness { // wait a bit to make sure rebalancing logic is triggered - Thread.sleep(1000) + Thread.sleep(1500) // check Partition Owner Registry val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir) val expected_3 = List( ("200-0", "group1_consumer1-0"), diff --git a/examples/src/main/java/kafka/examples/KafkaProperties.java b/examples/src/main/java/kafka/examples/KafkaProperties.java index d9a2104..9d1cd31 100644 --- a/examples/src/main/java/kafka/examples/KafkaProperties.java +++ b/examples/src/main/java/kafka/examples/KafkaProperties.java @@ -28,4 +28,5 @@ public interface KafkaProperties final static int reconnectInterval = 10000; final static String topic2 = "topic2"; final static String topic3 = "topic3"; + final static String clientId = "SimpleConsumerDemoClient"; } diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java index 1cb8a83..a765e0b 100644 --- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java +++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java @@ -16,71 +16,76 @@ */ package kafka.examples; -import java.util.ArrayList; -import java.util.List; - -import kafka.javaapi.MultiFetchResponse; +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.javaapi.FetchResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.javaapi.message.MessageSet; import kafka.message.MessageAndOffset; -import scala.collection.Iterator; -import kafka.api.FetchRequest; -import kafka.message.Message; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -public class SimpleConsumerDemo -{ - private static void printMessages(ByteBufferMessageSet messageSet) - { +public class SimpleConsumerDemo { + + private static void printMessages(ByteBufferMessageSet messageSet) { for (MessageAndOffset messageAndOffset : messageSet) { System.out.println(ExampleUtils.getMessage(messageAndOffset.message())); } } - private static void generateData() - { + private static void generateData() { Producer producer2 = new Producer(KafkaProperties.topic2); producer2.start(); Producer producer3 = new Producer(KafkaProperties.topic3); producer3.start(); - try - { + try { Thread.sleep(1000); - } - catch (InterruptedException e) - { + } catch (InterruptedException e) { e.printStackTrace(); } } - public static void main(String[] args) - { - + public static void main(String[] args) { generateData(); + SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL, KafkaProperties.kafkaServerPort, KafkaProperties.connectionTimeOut, KafkaProperties.kafkaProducerBufferSize); System.out.println("Testing single fetch"); - FetchRequest req = new FetchRequest(KafkaProperties.topic2, 0, 0L, 100); - ByteBufferMessageSet messageSet = simpleConsumer.fetch(req); - printMessages(messageSet); + FetchRequest req = new FetchRequestBuilder() + .correlationId(0) + .clientId(KafkaProperties.clientId) + .addFetch(KafkaProperties.topic2, 0, 0L, 100) + .build(); + FetchResponse fetchResponse = simpleConsumer.fetch(req); + printMessages((ByteBufferMessageSet) fetchResponse.messageSet(KafkaProperties.topic2, 0)); System.out.println("Testing single multi-fetch"); - req = new FetchRequest(KafkaProperties.topic2, 0, 0L, 100); - List list = new ArrayList(); - list.add(req); - req = new FetchRequest(KafkaProperties.topic3, 0, 0L, 100); - list.add(req); - MultiFetchResponse response = simpleConsumer.multifetch(list); + Map> topicMap = new HashMap>() {{ + put(KafkaProperties.topic2, new ArrayList(){{ add(0); }}); + put(KafkaProperties.topic3, new ArrayList(){{ add(0); }}); + }}; + req = new FetchRequestBuilder() + .correlationId(0) + .clientId(KafkaProperties.clientId) + .addFetch(KafkaProperties.topic2, 0, 0L, 100) + .addFetch(KafkaProperties.topic3, 0, 0L, 100) + .build(); + fetchResponse = simpleConsumer.fetch(req); int fetchReq = 0; - for (ByteBufferMessageSet resMessageSet : response ) - { - System.out.println("Response from fetch request no: " + ++fetchReq); - printMessages(resMessageSet); + for ( Map.Entry> entry : topicMap.entrySet() ) { + String topic = entry.getKey(); + for ( Integer offset : entry.getValue()) { + System.out.println("Response from fetch request no: " + ++fetchReq); + printMessages((ByteBufferMessageSet) fetchResponse.messageSet(topic, offset)); + } } } - } diff --git a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala index 02c3008..955d116 100644 --- a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala @@ -18,13 +18,12 @@ package kafka.perf import java.net.URI -import joptsimple._ -import kafka.utils._ -import kafka.server._ +import java.text.SimpleDateFormat +import kafka.api.{FetchRequestBuilder, OffsetRequest} import kafka.consumer.SimpleConsumer +import kafka.utils._ import org.apache.log4j.Logger -import kafka.api.{OffsetRequest, FetchRequest} -import java.text.SimpleDateFormat +import kafka.message.ByteBufferMessageSet /** * Performance test for the simple consumer @@ -56,12 +55,20 @@ object SimpleConsumerPerformance { var lastReportTime: Long = startMs var lastBytesRead = 0L var lastMessagesRead = 0L + var reqId = 0 while(!done) { - val messages = consumer.fetch(new FetchRequest(config.topic, config.partition, offset, config.fetchSize)) + // TODO: add in the maxWait and minBytes for performance + val request = new FetchRequestBuilder() + .correlationId(reqId) + .clientId(config.clientId) + .addFetch(config.topic, config.partition, offset, config.fetchSize) + .build() + val fetchResponse = consumer.fetch(request) + var messagesRead = 0 var bytesRead = 0 - - for(message <- messages) { + val messageSet = fetchResponse.messageSet(config.topic, config.partition).asInstanceOf[ByteBufferMessageSet] + for (message <- messageSet) { messagesRead += 1 bytesRead += message.message.payloadSize } @@ -69,7 +76,8 @@ object SimpleConsumerPerformance { if(messagesRead == 0 || totalMessagesRead > config.numMessages) done = true else - offset += messages.validBytes + // we only did one fetch so we find the offset for the first (head) messageset + offset += messageSet.validBytes totalBytesRead += bytesRead totalMessagesRead += messagesRead @@ -89,6 +97,7 @@ object SimpleConsumerPerformance { lastMessagesRead = totalMessagesRead consumedInterval = 0 } + reqId += 1 } val reportTime = System.currentTimeMillis val elapsed = (reportTime - startMs) / 1000.0 @@ -119,6 +128,11 @@ object SimpleConsumerPerformance { .describedAs("bytes") .ofType(classOf[java.lang.Integer]) .defaultsTo(1024*1024) + val clientIdOpt = parser.accepts("clientId", "The ID of this client.") + .withOptionalArg + .describedAs("clientId") + .ofType(classOf[String]) + .defaultsTo("SimpleConsumerPerformanceClient") val options = parser.parse(args : _*) @@ -139,5 +153,6 @@ object SimpleConsumerPerformance { val showDetailedStats = options.has(showDetailedStatsOpt) val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) val hideHeader = options.has(hideHeaderOpt) + val clientId = options.valueOf(clientIdOpt).toString } } diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala new file mode 100644 index 0000000..9b10b44 --- /dev/null +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import java.nio.channels.GatheringByteChannel +import kafka.common.ErrorMapping +import kafka.message.{MessageSet, ByteBufferMessageSet} +import kafka.network.{MultiSend, Send} +import kafka.utils.Utils + +object PartitionData { + def readFrom(buffer: ByteBuffer): PartitionData = { + val partition = buffer.getInt + val error = buffer.getInt + val initialOffset = buffer.getLong + val messageSetSize = buffer.getInt + val messageSetBuffer = buffer.slice() + messageSetBuffer.limit(messageSetSize) + buffer.position(buffer.position + messageSetSize) + new PartitionData(partition, error, initialOffset, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error)) + } +} + +case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) { + val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() +} + +object TopicData { + def readFrom(buffer: ByteBuffer): TopicData = { + val topic = Utils.readShortString(buffer, "UTF-8") + val partitionCount = buffer.getInt + val partitions = new Array[PartitionData](partitionCount) + for(i <- 0 until partitions.length) + partitions(i) = PartitionData.readFrom(buffer) + new TopicData(topic, partitions.sortBy(_.partition)) + } + + def findPartition(data: Array[PartitionData], partition: Int): Option[PartitionData] = { + if(data == null || data.size == 0) + return None + + var (low, high) = (0, data.size-1) + while(low <= high) { + val mid = (low + high) / 2 + val found = data(mid) + if(found.partition == partition) + return Some(found) + else if(partition < found.partition) + high = mid - 1 + else + low = mid + 1 + } + None + } +} + +case class TopicData(topic: String, partitionData: Array[PartitionData]) { + val sizeInBytes = 2 + topic.length + partitionData.foldLeft(4)(_ + _.sizeInBytes) +} + +object FetchResponse { + def readFrom(buffer: ByteBuffer): FetchResponse = { + val correlationId = buffer.getInt + val versionId = buffer.getShort + val error = buffer.getShort + val dataCount = buffer.getInt + val data = new Array[TopicData](dataCount) + for(i <- 0 until data.length) + data(i) = TopicData.readFrom(buffer) + new FetchResponse(correlationId, versionId, error, data) + } +} + +case class FetchResponse(correlationId: Int, versionId: Short, error: Short, data: Array[TopicData]) { + + val sizeInBytes = 4 + 2 + 2 + data.foldLeft(4)(_ + _.sizeInBytes) + + lazy val topicMap = data.groupBy(_.topic).mapValues(_.head) + + def messageSet(topic: String, partition: Int): MessageSet = { + topicMap.get(topic) match { + case Some(topicData) => + TopicData.findPartition(topicData.partitionData, partition).map(_.messages).getOrElse(MessageSet.Empty) + case None => + MessageSet.Empty + } + } +} + +// SENDS + +class PartitionDataSend(val partitionData: PartitionData) extends Send { + private val messageSize = partitionData.messages.sizeInBytes + private var messagesSentSize = 0L + + private val buffer = ByteBuffer.allocate(20) + buffer.putInt(partitionData.partition) + buffer.putInt(partitionData.error) + buffer.putLong(partitionData.initialOffset) + buffer.putInt(partitionData.messages.sizeInBytes.intValue()) + buffer.rewind() + + def complete = !buffer.hasRemaining && messagesSentSize >= messageSize + + def writeTo(channel: GatheringByteChannel): Int = { + var written = 0 + if(buffer.hasRemaining) + written += channel.write(buffer) + if(!buffer.hasRemaining && messagesSentSize < messageSize) { + val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt + messagesSentSize += bytesSent + written += bytesSent + } + written + } +} + +class TopicDataSend(val topicData: TopicData) extends Send { + val size = topicData.sizeInBytes + + var sent = 0 + + private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4) + Utils.writeShortString(buffer, topicData.topic, "UTF-8") + buffer.putInt(topicData.partitionData.length) + buffer.rewind() + + val sends = new MultiSend(topicData.partitionData.map(new PartitionDataSend(_)).toList) { + val expectedBytesToWrite = topicData.partitionData.foldLeft(0)(_ + _.sizeInBytes) + } + + def complete = sent >= size + + def writeTo(channel: GatheringByteChannel): Int = { + expectIncomplete() + var written = 0 + if(buffer.hasRemaining) + written += channel.write(buffer) + if(!buffer.hasRemaining && !sends.complete) { + val bytesSent = sends.writeCompletely(channel) + written += bytesSent + } + sent += written + written + } +} + +class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { + private val size = fetchResponse.sizeInBytes + + private var sent = 0 + + private val buffer = ByteBuffer.allocate(18) + buffer.putInt(size + 2) + buffer.putShort(ErrorMapping.NoError.shortValue()) + buffer.putInt(fetchResponse.correlationId) + buffer.putShort(fetchResponse.versionId) + buffer.putShort(fetchResponse.error) + buffer.putInt(fetchResponse.data.length) + buffer.rewind() + + val sends = new MultiSend(fetchResponse.data.map(new TopicDataSend(_)).toList) { + val expectedBytesToWrite = fetchResponse.data.foldLeft(0)(_ + _.sizeInBytes) + } + + def complete = sent >= sendSize + + def writeTo(channel: GatheringByteChannel):Int = { + expectIncomplete() + var written = 0 + if(buffer.hasRemaining) + written += channel.write(buffer) + if(!buffer.hasRemaining && !sends.complete) { + written += sends.writeCompletely(channel) + } + sent += written + written + } + + def sendSize = 4 + 2 + fetchResponse.sizeInBytes + +} diff --git a/core/src/main/scala/kafka/api/MultiFetchRequest.scala b/core/src/main/scala/kafka/api/MultiFetchRequest.scala deleted file mode 100644 index 6ecc619..0000000 --- a/core/src/main/scala/kafka/api/MultiFetchRequest.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.nio._ -import kafka.network._ - -object MultiFetchRequest { - def readFrom(buffer: ByteBuffer): MultiFetchRequest = { - val count = buffer.getShort - val fetches = new Array[FetchRequest](count) - for(i <- 0 until fetches.length) - fetches(i) = FetchRequest.readFrom(buffer) - new MultiFetchRequest(fetches) - } -} - -class MultiFetchRequest(val fetches: Array[FetchRequest]) extends Request(RequestKeys.MultiFetch) { - def writeTo(buffer: ByteBuffer) { - if(fetches.length > Short.MaxValue) - throw new IllegalArgumentException("Number of requests in MultiFetchRequest exceeds " + Short.MaxValue + ".") - buffer.putShort(fetches.length.toShort) - for(fetch <- fetches) - fetch.writeTo(buffer) - } - - def sizeInBytes: Int = { - var size = 2 - for(fetch <- fetches) - size += fetch.sizeInBytes - size - } - - - override def toString(): String = { - val buffer = new StringBuffer - for(fetch <- fetches) { - buffer.append(fetch.toString) - buffer.append(",") - } - buffer.toString - } -} diff --git a/core/src/main/scala/kafka/api/MultiFetchResponse.scala b/core/src/main/scala/kafka/api/MultiFetchResponse.scala deleted file mode 100644 index 9eefa02..0000000 --- a/core/src/main/scala/kafka/api/MultiFetchResponse.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.nio._ -import collection.mutable -import kafka.utils.IteratorTemplate -import kafka.message._ - -class MultiFetchResponse(val buffer: ByteBuffer, val numSets: Int, val offsets: Array[Long]) extends Iterable[ByteBufferMessageSet] { - private val messageSets = new mutable.ListBuffer[ByteBufferMessageSet] - - for(i <- 0 until numSets) { - val size = buffer.getInt() - val errorCode: Int = buffer.getShort() - val copy = buffer.slice() - val payloadSize = size - 2 - copy.limit(payloadSize) - buffer.position(buffer.position + payloadSize) - messageSets += new ByteBufferMessageSet(copy, offsets(i), errorCode) - } - - def iterator : Iterator[ByteBufferMessageSet] = { - new IteratorTemplate[ByteBufferMessageSet] { - val iter = messageSets.iterator - - override def makeNext(): ByteBufferMessageSet = { - if(iter.hasNext) - iter.next - else - return allDone - } - } - } - - override def toString() = this.messageSets.toString -} diff --git a/core/src/main/scala/kafka/javaapi/FetchResponse.scala b/core/src/main/scala/kafka/javaapi/FetchResponse.scala new file mode 100644 index 0000000..c160dd7 --- /dev/null +++ b/core/src/main/scala/kafka/javaapi/FetchResponse.scala @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi + +import kafka.api.TopicData + + +class FetchResponse(val correlationId: Int, + val versionId: Short, + val error: Short, + val data: Array[TopicData]) { + + private val underlying = new kafka.api.FetchResponse(correlationId, versionId, error, data) + + def messageSet(topic: String, partition: Int): kafka.javaapi.message.MessageSet = { + import Implicits._ + underlying.messageSet(topic, partition).asInstanceOf[kafka.message.ByteBufferMessageSet] + } +} diff --git a/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala b/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala deleted file mode 100644 index 3bf5f44..0000000 --- a/core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.javaapi - -import kafka.utils.IteratorTemplate -import java.nio.ByteBuffer -import message.ByteBufferMessageSet - -class MultiFetchResponse(buffer: ByteBuffer, numSets: Int, offsets: Array[Long]) extends java.lang.Iterable[ByteBufferMessageSet] { - val underlyingBuffer = ByteBuffer.wrap(buffer.array) - // this has the side effect of setting the initial position of buffer correctly - val errorCode = underlyingBuffer.getShort - - import Implicits._ - val underlying = new kafka.api.MultiFetchResponse(underlyingBuffer, numSets, offsets) - - override def toString() = underlying.toString - - def iterator : java.util.Iterator[ByteBufferMessageSet] = { - new IteratorTemplate[ByteBufferMessageSet] { - val iter = underlying.iterator - override def makeNext(): ByteBufferMessageSet = { - if(iter.hasNext) - iter.next - else - return allDone - } - } - } -} --