diff --git contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java index 5166358..09b1264 100644 --- contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java +++ contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java @@ -26,10 +26,10 @@ import java.util.Random; import kafka.etl.KafkaETLKey; import kafka.etl.KafkaETLRequest; import kafka.etl.Props; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.javaapi.producer.SyncProducer; +import kafka.javaapi.producer.Producer; +import kafka.javaapi.producer.ProducerData; import kafka.message.Message; -import kafka.producer.SyncProducerConfig; +import kafka.producer.ProducerConfig; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; @@ -47,7 +47,7 @@ public class DataGenerator { System.currentTimeMillis()); protected Props _props; - protected SyncProducer _producer = null; + protected Producer _producer = null; protected URI _uri = null; protected String _topic; protected int _count; @@ -70,12 +70,12 @@ public class DataGenerator { System.out.println("server uri:" + _uri.toString()); Properties producerProps = new Properties(); - producerProps.put("host", _uri.getHost()); - producerProps.put("port", String.valueOf(_uri.getPort())); + producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort())); producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE)); producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT)); producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL)); - _producer = new SyncProducer(new SyncProducerConfig(producerProps)); + + _producer = new Producer(new ProducerConfig(producerProps)); } @@ -91,7 +91,8 @@ public class DataGenerator { } // send events System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri); - _producer.send(_topic, new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, list)); + ProducerData pd = new ProducerData(_topic, null, list); + _producer.send(pd); // close the producer _producer.close(); diff --git core/src/main/scala/kafka/api/FetchRequest.scala core/src/main/scala/kafka/api/FetchRequest.scala index 96b4e13..d924779 100644 --- core/src/main/scala/kafka/api/FetchRequest.scala +++ core/src/main/scala/kafka/api/FetchRequest.scala @@ -18,60 +18,13 @@ package kafka.api import java.nio.ByteBuffer -import kafka.utils.Utils -import scala.collection.mutable.{HashMap, Buffer, ListBuffer} -import kafka.common.{KafkaException, FetchRequestFormatException} +import kafka.utils.{nonthreadsafe, Utils} +import scala.collection.immutable.Map +import kafka.common.TopicAndPartition -object OffsetDetail { - def readFrom(buffer: ByteBuffer): OffsetDetail = { - val topic = Utils.readShortString(buffer, "UTF-8") +case class PartitionFetchInfo(offset: Long, fetchSize: Int) - val partitionsCount = buffer.getInt - val partitions = new Array[Int](partitionsCount) - for (i <- 0 until partitions.length) - partitions(i) = buffer.getInt - - val offsetsCount = buffer.getInt - val offsets = new Array[Long](offsetsCount) - for (i <- 0 until offsets.length) - offsets(i) = buffer.getLong - - val fetchesCount = buffer.getInt - val fetchSizes = new Array[Int](fetchesCount) - for (i <- 0 until fetchSizes.length) - fetchSizes(i) = buffer.getInt - - new OffsetDetail(topic, partitions, offsets, fetchSizes) - } - -} - -case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long], fetchSizes: Seq[Int]) { - - def writeTo(buffer: ByteBuffer) { - Utils.writeShortString(buffer, topic, "UTF-8") - - if(partitions.size > Int.MaxValue || offsets.size > Int.MaxValue || fetchSizes.size > Int.MaxValue) - throw new KafkaException("Number of fetches in FetchRequest exceeds " + Int.MaxValue + ".") - - buffer.putInt(partitions.length) - partitions.foreach(buffer.putInt(_)) - - buffer.putInt(offsets.length) - offsets.foreach(buffer.putLong(_)) - - buffer.putInt(fetchSizes.length) - fetchSizes.foreach(buffer.putInt(_)) - } - - def sizeInBytes(): Int = { - 2 + topic.length() + // topic string - partitions.foldLeft(4)((s, _) => s + 4) + // each request partition (int) - offsets.foldLeft(4)((s, _) => s + 8) + // each request offset (long) - fetchSizes.foldLeft(4)((s,_) => s + 4) // each request fetch size - } -} object FetchRequest { val CurrentVersion = 1.shortValue() @@ -85,18 +38,23 @@ object FetchRequest { def readFrom(buffer: ByteBuffer): FetchRequest = { val versionId = buffer.getShort val correlationId = buffer.getInt - val clientId = Utils.readShortString(buffer, "UTF-8") + val clientId = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) val replicaId = buffer.getInt val maxWait = buffer.getInt val minBytes = buffer.getInt - val offsetsCount = buffer.getInt - val offsetInfo = new Array[OffsetDetail](offsetsCount) - for(i <- 0 until offsetInfo.length) - offsetInfo(i) = OffsetDetail.readFrom(buffer) - - new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetInfo) + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partitionId = buffer.getInt + val offset = buffer.getLong + val fetchSize = buffer.getInt + (TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, fetchSize)) + }) + }) + FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, Map(pairs:_*)) } - } case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, @@ -105,50 +63,63 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, replicaId: Int = FetchRequest.DefaultReplicaId, maxWait: Int = FetchRequest.DefaultMaxWait, minBytes: Int = FetchRequest.DefaultMinBytes, - offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.FetchKey)) { - - // ensure that a topic "X" appears in at most one OffsetDetail - def validate() { - if(offsetInfo == null) - throw new FetchRequestFormatException("FetchRequest has null offsetInfo") - - // We don't want to get fancy with groupBy's and filter's since we just want the first occurrence - var topics = Set[String]() - val iter = offsetInfo.iterator - while(iter.hasNext) { - val offsetData = iter.next() - val topic = offsetData.topic - if(topics.contains(topic)) - throw new FetchRequestFormatException("FetchRequest has multiple OffsetDetails for topic: " + topic) - else - topics += topic - } - } + requestInfo: Map[TopicAndPartition, PartitionFetchInfo]) + extends RequestOrResponse(Some(RequestKeys.FetchKey)) { - def writeTo(buffer: ByteBuffer) { - // validate first - validate() + /** + * Partitions the request info into a map of maps (one for each topic). + */ + lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) - Utils.writeShortString(buffer, clientId, "UTF-8") + Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset) buffer.putInt(replicaId) buffer.putInt(maxWait) buffer.putInt(minBytes) - buffer.putInt(offsetInfo.size) - for(topicDetail <- offsetInfo) { - topicDetail.writeTo(buffer) + buffer.putInt(requestInfoGroupedByTopic.size) // topic count + requestInfoGroupedByTopic.foreach { + case (topic, partitionFetchInfos) => + Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) + buffer.putInt(partitionFetchInfos.size) // partition count + partitionFetchInfos.foreach { + case (TopicAndPartition(_, partition), PartitionFetchInfo(offset, fetchSize)) => + buffer.putInt(partition) + buffer.putLong(offset) + buffer.putInt(fetchSize) + } } } - def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes()) + def sizeInBytes: Int = { + 2 + /* versionId */ + 4 + /* correlationId */ + Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + + 4 + /* replicaId */ + 4 + /* maxWait */ + 4 + /* minBytes */ + 4 + /* topic count */ + requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { + val (topic, partitionFetchInfos) = currTopic + foldedTopics + + Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) + + 4 + /* partition count */ + partitionFetchInfos.size * ( + 4 + /* partition id */ + 8 + /* offset */ + 4 /* fetch size */ + ) + }) + } - def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size) + def isFromFollower = replicaId != FetchRequest.NonFollowerId - def isFromFollower(): Boolean = replicaId != FetchRequest.NonFollowerId + def numPartitions = requestInfo.size } +@nonthreadsafe class FetchRequestBuilder() { private var correlationId = FetchRequest.DefaultCorrelationId private val versionId = FetchRequest.CurrentVersion @@ -156,13 +127,10 @@ class FetchRequestBuilder() { private var replicaId = FetchRequest.DefaultReplicaId private var maxWait = FetchRequest.DefaultMaxWait private var minBytes = FetchRequest.DefaultMinBytes - private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]] + private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo] def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = { - val topicData = requestMap.getOrElseUpdate(topic, (ListBuffer[Int](), ListBuffer[Long](), ListBuffer[Int]())) - topicData._1.append(partition) - topicData._2.append(offset) - topicData._3.append(fetchSize) + requestMap.put(TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) this } @@ -191,10 +159,5 @@ class FetchRequestBuilder() { this } - def build() = { - val offsetDetails = requestMap.map{ topicData => - new OffsetDetail(topicData._1, topicData._2._1.toArray, topicData._2._2.toArray, topicData._2._3.toArray) - } - new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetDetails.toArray[OffsetDetail]) - } + def build() = FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, requestMap.toMap) } diff --git core/src/main/scala/kafka/api/FetchResponse.scala core/src/main/scala/kafka/api/FetchResponse.scala index 3cf57b8..6a8f534 100644 --- core/src/main/scala/kafka/api/FetchResponse.scala +++ core/src/main/scala/kafka/api/FetchResponse.scala @@ -19,27 +19,35 @@ package kafka.api import java.nio.ByteBuffer import java.nio.channels.GatheringByteChannel -import kafka.common.ErrorMapping +import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.message.{MessageSet, ByteBufferMessageSet} import kafka.network.{MultiSend, Send} import kafka.utils.Utils object PartitionData { def readFrom(buffer: ByteBuffer): PartitionData = { - val error = buffer.getShort val partition = buffer.getInt + val error = buffer.getShort val initialOffset = buffer.getLong - val hw = buffer.getLong() + val hw = buffer.getLong val messageSetSize = buffer.getInt val messageSetBuffer = buffer.slice() messageSetBuffer.limit(messageSetSize) buffer.position(buffer.position + messageSetSize) new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset)) } + + val headerSize = + 4 + /* partition */ + 2 + /* error code */ + 8 + /* initialOffset */ + 8 + /* high watermark */ + 4 /* messageSetSize */ } case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) { - val sizeInBytes = 4 + 2 + 8 + 4 + messages.sizeInBytes.intValue() + 8 + + val sizeInBytes = PartitionData.headerSize + messages.sizeInBytes.intValue() def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages) } @@ -50,17 +58,17 @@ class PartitionDataSend(val partitionData: PartitionData) extends Send { private val messageSize = partitionData.messages.sizeInBytes private var messagesSentSize = 0L - private val buffer = ByteBuffer.allocate(26) - buffer.putShort(partitionData.error) + private val buffer = ByteBuffer.allocate(PartitionData.headerSize) buffer.putInt(partitionData.partition) + buffer.putShort(partitionData.error) buffer.putLong(partitionData.initialOffset) buffer.putLong(partitionData.hw) buffer.putInt(partitionData.messages.sizeInBytes.intValue()) buffer.rewind() - def complete = !buffer.hasRemaining && messagesSentSize >= messageSize + override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize - def writeTo(channel: GatheringByteChannel): Int = { + override def writeTo(channel: GatheringByteChannel): Int = { var written = 0 if(buffer.hasRemaining) written += channel.write(buffer) @@ -75,63 +83,43 @@ class PartitionDataSend(val partitionData: PartitionData) extends Send { object TopicData { def readFrom(buffer: ByteBuffer): TopicData = { - val topic = Utils.readShortString(buffer, "UTF-8") + val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) val partitionCount = buffer.getInt - val partitions = new Array[PartitionData](partitionCount) - for(i <- 0 until partitionCount) - partitions(i) = PartitionData.readFrom(buffer) - new TopicData(topic, partitions.sortBy(_.partition)) + val topicPartitionDataPairs = (1 to partitionCount).map(_ => { + val partitionData = PartitionData.readFrom(buffer) + (TopicAndPartition(topic, partitionData.partition), partitionData) + }) + TopicData(topic, Map(topicPartitionDataPairs:_*)) } - def findPartition(data: Array[PartitionData], partition: Int): Option[PartitionData] = { - if(data == null || data.size == 0) - return None - - var (low, high) = (0, data.size-1) - while(low <= high) { - val mid = (low + high) / 2 - val found = data(mid) - if(found.partition == partition) - return Some(found) - else if(partition < found.partition) - high = mid - 1 - else - low = mid + 1 - } - None - } + def headerSize(topic: String) = + Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) + + 4 /* partition count */ } -case class TopicData(topic: String, partitionDataArray: Array[PartitionData]) { - val sizeInBytes = 2 + topic.length + partitionDataArray.foldLeft(4)(_ + _.sizeInBytes) +case class TopicData(topic: String, partitionData: Map[TopicAndPartition, PartitionData]) { + val sizeInBytes = + TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + _.sizeInBytes) - // need to override equals due to brokern java-arrays equals functionality - override def equals(other: Any): Boolean = { - other match { - case that: TopicData => - ( topic == that.topic && - partitionDataArray.toSeq == that.partitionDataArray.toSeq ) - case _ => false - } - } + val headerSize = TopicData.headerSize(topic) } class TopicDataSend(val topicData: TopicData) extends Send { - val size = topicData.sizeInBytes + private val size = topicData.sizeInBytes - var sent = 0 + private var sent = 0 - private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4) - Utils.writeShortString(buffer, topicData.topic, "UTF-8") - buffer.putInt(topicData.partitionDataArray.length) + override def complete = sent >= size + + private val buffer = ByteBuffer.allocate(topicData.headerSize) + Utils.writeShortString(buffer, topicData.topic, RequestOrResponse.DefaultCharset) + buffer.putInt(topicData.partitionData.size) buffer.rewind() - val sends = new MultiSend(topicData.partitionDataArray.map(new PartitionDataSend(_)).toList) { - val expectedBytesToWrite = topicData.partitionDataArray.foldLeft(0)(_ + _.sizeInBytes) + val sends = new MultiSend(topicData.partitionData.toList.map(d => new PartitionDataSend(d._2))) { + val expectedBytesToWrite = topicData.sizeInBytes - topicData.headerSize } - def complete = sent >= size - def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() var written = 0 @@ -146,46 +134,59 @@ class TopicDataSend(val topicData: TopicData) extends Send { } +object FetchResponse { + val headerSize = + 2 + /* versionId */ + 4 + /* correlationId */ + 4 /* topic count */ -object FetchResponse { def readFrom(buffer: ByteBuffer): FetchResponse = { val versionId = buffer.getShort - val errorCode = buffer.getShort val correlationId = buffer.getInt - val dataCount = buffer.getInt - val data = new Array[TopicData](dataCount) - for(i <- 0 until data.length) - data(i) = TopicData.readFrom(buffer) - new FetchResponse(versionId, correlationId, data, errorCode) + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topicData = TopicData.readFrom(buffer) + topicData.partitionData.values.map( + partitionData => (TopicAndPartition(topicData.topic, partitionData.partition), partitionData) + ) + }) + FetchResponse(versionId, correlationId, Map(pairs:_*)) } } case class FetchResponse(versionId: Short, correlationId: Int, - data: Array[TopicData], - errorCode: Short = ErrorMapping.NoError) { - - val sizeInBytes = 2 + 4 + 2 + data.foldLeft(4)(_ + _.sizeInBytes) - - lazy val topicMap = data.groupBy(_.topic).mapValues(_.head) - - def messageSet(topic: String, partition: Int): ByteBufferMessageSet = { - val messageSet = topicMap.get(topic) match { - case Some(topicData) => - TopicData.findPartition(topicData.partitionDataArray, partition).map(_.messages).getOrElse(MessageSet.Empty) - case None => - MessageSet.Empty - } - messageSet.asInstanceOf[ByteBufferMessageSet] - } - - def highWatermark(topic: String, partition: Int): Long = { - topicMap.get(topic) match { - case Some(topicData) => - TopicData.findPartition(topicData.partitionDataArray, partition).map(_.hw).getOrElse(-1L) - case None => -1L + data: Map[TopicAndPartition, PartitionData]) { + + /** + * Partitions the data into a map of maps (one for each topic). + */ + lazy val dataGroupedByTopic = data.groupBy(_._1.topic) + + val sizeInBytes = + FetchResponse.headerSize + + dataGroupedByTopic.foldLeft(0) ((folded, curr) => { + val topicData = TopicData(curr._1, curr._2) + folded + + topicData.sizeInBytes + }) + + def messageSet(topic: String, partition: Int): ByteBufferMessageSet = data.get(TopicAndPartition(topic, partition)) + .map(_.messages).getOrElse(MessageSet.Empty).asInstanceOf[ByteBufferMessageSet] + + def highWatermark(topic: String, partition: Int) = + data.get(TopicAndPartition(topic, partition)).map(_.hw).getOrElse(-1L) + + def hasError = data.values.exists(_.error != ErrorMapping.NoError) + + def errorCode(topic: String, partition: Int) = { + val topicAndPartition = TopicAndPartition(topic, partition) + data.get(topicAndPartition) match { + case Some(partitionData) => partitionData.error + case _ => + throw new IllegalArgumentException("No partition %s in fetch response %s".format(topicAndPartition, this.toString)) } } } @@ -193,21 +194,25 @@ case class FetchResponse(versionId: Short, class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { private val size = fetchResponse.sizeInBytes + private var sent = 0 - - private val buffer = ByteBuffer.allocate(16) + + private val sendSize = 4 /* for size */ + size + + override def complete = sent >= sendSize + + private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize) buffer.putInt(size) buffer.putShort(fetchResponse.versionId) - buffer.putShort(fetchResponse.errorCode) buffer.putInt(fetchResponse.correlationId) - buffer.putInt(fetchResponse.data.length) + buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count buffer.rewind() - - val sends = new MultiSend(fetchResponse.data.map(new TopicDataSend(_)).toList) { - val expectedBytesToWrite = fetchResponse.data.foldLeft(0)(_ + _.sizeInBytes) - } - def complete = sent >= sendSize + val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map { + case(topic, data) => new TopicDataSend(TopicData(topic, data)) + }) { + val expectedBytesToWrite = fetchResponse.sizeInBytes - FetchResponse.headerSize + } def writeTo(channel: GatheringByteChannel):Int = { expectIncomplete() @@ -220,6 +225,5 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { sent += written written } - - def sendSize = 4 + fetchResponse.sizeInBytes } + diff --git core/src/main/scala/kafka/api/ProducerRequest.scala core/src/main/scala/kafka/api/ProducerRequest.scala index efbaf8b..38458ba 100644 --- core/src/main/scala/kafka/api/ProducerRequest.scala +++ core/src/main/scala/kafka/api/ProducerRequest.scala @@ -20,96 +20,105 @@ package kafka.api import java.nio._ import kafka.message._ import kafka.utils._ +import scala.collection.Map +import kafka.common.TopicAndPartition object ProducerRequest { val CurrentVersion: Short = 0 - + def readFrom(buffer: ByteBuffer): ProducerRequest = { val versionId: Short = buffer.getShort val correlationId: Int = buffer.getInt - val clientId: String = Utils.readShortString(buffer, "UTF-8") + val clientId: String = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) val requiredAcks: Short = buffer.getShort val ackTimeoutMs: Int = buffer.getInt //build the topic structure val topicCount = buffer.getInt - val data = new Array[TopicData](topicCount) - for(i <- 0 until topicCount) { - val topic = Utils.readShortString(buffer, "UTF-8") - + val partitionDataPairs = (1 to topicCount).flatMap(_ => { + // process topic + val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) val partitionCount = buffer.getInt - //build the partition structure within this topic - val partitionData = new Array[PartitionData](partitionCount) - for (j <- 0 until partitionCount) { + (1 to partitionCount).map(_ => { val partition = buffer.getInt val messageSetSize = buffer.getInt val messageSetBuffer = new Array[Byte](messageSetSize) buffer.get(messageSetBuffer,0,messageSetSize) - partitionData(j) = new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) - } - data(i) = new TopicData(topic,partitionData) - } - new ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, data) + (TopicAndPartition(topic, partition), + new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))) + }) + }) + + ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, Map(partitionDataPairs:_*)) } } -case class ProducerRequest( versionId: Short, +case class ProducerRequest( versionId: Short = ProducerRequest.CurrentVersion, correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, - data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { + data: Map[TopicAndPartition, PartitionData]) + extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { + + /** + * Partitions the data into a map of maps (one for each topic). + */ + private lazy val dataGroupedByTopic = data.groupBy(_._1.topic) - def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) = + def this(correlationId: Int, + clientId: String, + requiredAcks: Short, + ackTimeoutMs: Int, + data: Map[TopicAndPartition, PartitionData]) = this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data) def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) - Utils.writeShortString(buffer, clientId, "UTF-8") + Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset) buffer.putShort(requiredAcks) buffer.putInt(ackTimeoutMs) + //save the topic structure - buffer.putInt(data.size) //the number of topics - for(topicData <- data) { - Utils.writeShortString(buffer, topicData.topic, "UTF-8") //write the topic - buffer.putInt(topicData.partitionDataArray.size) //the number of partitions - for(partitionData <- topicData.partitionDataArray) { - buffer.putInt(partitionData.partition) - buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit) - buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer) - partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind - } + buffer.putInt(dataGroupedByTopic.size) //the number of topics + dataGroupedByTopic.foreach { + case (topic, topicAndPartitionData) => + Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) //write the topic + buffer.putInt(topicAndPartitionData.size) //the number of partitions + topicAndPartitionData.foreach(partitionAndData => { + val partitionData = partitionAndData._2 + buffer.putInt(partitionData.partition) + buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit) + buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer) + partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind + }) } } - def sizeInBytes(): Int = { - var size = 0 - //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size - size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4 - for(topicData <- data) { - size += 2 + topicData.topic.length + 4 - for(partitionData <- topicData.partitionDataArray) { - size += 4 + 4 + partitionData.messages.sizeInBytes.asInstanceOf[Int] + def sizeInBytes: Int = { + 2 + /* versionId */ + 4 + /* correlationId */ + Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + /* client id */ + 2 + /* requiredAcks */ + 4 + /* ackTimeoutMs */ + 4 + /* number of topics */ + dataGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { + foldedTopics + + Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) + + 4 + /* the number of partitions */ + { + currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => { + foldedPartitions + + 4 + /* partition id */ + 4 + /* byte-length of serialized messages */ + currPartition._2.messages.sizeInBytes.toInt + }) } - } - size - } - - // need to override case-class equals due to broken java-array equals() - override def equals(other: Any): Boolean = { - other match { - case that: ProducerRequest => - ( correlationId == that.correlationId && - clientId == that.clientId && - requiredAcks == that.requiredAcks && - ackTimeoutMs == that.ackTimeoutMs && - data.toSeq == that.data.toSeq ) - case _ => false - } + }) } - def topicPartitionCount = data.foldLeft(0)(_ + _.partitionDataArray.length) + def numPartitions = data.size } diff --git core/src/main/scala/kafka/api/ProducerResponse.scala core/src/main/scala/kafka/api/ProducerResponse.scala index dc110e0..b9093b7 100644 --- core/src/main/scala/kafka/api/ProducerResponse.scala +++ core/src/main/scala/kafka/api/ProducerResponse.scala @@ -18,57 +18,81 @@ package kafka.api import java.nio.ByteBuffer -import kafka.common.ErrorMapping +import kafka.utils.Utils +import scala.collection.Map +import kafka.common.{TopicAndPartition, ErrorMapping} object ProducerResponse { def readFrom(buffer: ByteBuffer): ProducerResponse = { val versionId = buffer.getShort val correlationId = buffer.getInt - val errorCode = buffer.getShort - val errorsSize = buffer.getInt - val errors = new Array[Short](errorsSize) - for( i <- 0 until errorsSize) { - errors(i) = buffer.getShort - } - val offsetsSize = buffer.getInt - val offsets = new Array[Long](offsetsSize) - for( i <- 0 until offsetsSize) { - offsets(i) = buffer.getLong - } - new ProducerResponse(versionId, correlationId, errors, offsets, errorCode) + val topicCount = buffer.getInt + val statusPairs = (1 to topicCount).flatMap(_ => { + val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partition = buffer.getInt + val error = buffer.getShort + val offset = buffer.getLong + (TopicAndPartition(topic, partition), ProducerResponseStatus(error, offset)) + }) + }) + + ProducerResponse(versionId, correlationId, Map(statusPairs:_*)) } } -case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short], - offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ - val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length) +case class ProducerResponseStatus(error: Short, nextOffset: Long) + + +case class ProducerResponse(versionId: Short, + correlationId: Int, + status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse { + + /** + * Partitions the status map into a map of maps (one for each topic). + */ + private lazy val statusGroupedByTopic = status.groupBy(_._1.topic) + + def hasError = status.values.exists(_.error != ErrorMapping.NoError) + + val sizeInBytes = { + val groupedStatus = statusGroupedByTopic + 2 + /* version id */ + 4 + /* correlation id */ + 4 + /* topic count */ + groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => { + foldedTopics + + Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) + + 4 + /* partition count for this topic */ + currTopic._2.foldLeft (0) ((foldedPartitions, currPartition) => { + foldedPartitions + + 4 + /* partition id */ + 2 + /* error code */ + 8 /* offset */ + }) + }) + } def writeTo(buffer: ByteBuffer) { - /* version id */ + val groupedStatus = statusGroupedByTopic + buffer.putShort(versionId) - /* correlation id */ buffer.putInt(correlationId) - /* error code */ - buffer.putShort(errorCode) - /* errors */ - buffer.putInt(errors.length) - errors.foreach(buffer.putShort(_)) - /* offsets */ - buffer.putInt(offsets.length) - offsets.foreach(buffer.putLong(_)) - } + buffer.putInt(groupedStatus.size) // topic count - // need to override case-class equals due to broken java-array equals() - override def equals(other: Any): Boolean = { - other match { - case that: ProducerResponse => - ( correlationId == that.correlationId && - versionId == that.versionId && - errorCode == that.errorCode && - errors.toSeq == that.errors.toSeq && - offsets.toSeq == that.offsets.toSeq) - case _ => false - } + groupedStatus.foreach(topicStatus => { + val (topic, errorsAndOffsets) = topicStatus + Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) + buffer.putInt(errorsAndOffsets.size) // partition count + errorsAndOffsets.foreach { + case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset))) => + buffer.putInt(partition) + buffer.putShort(error) + buffer.putLong(nextOffset) + } + }) } -} \ No newline at end of file +} + diff --git core/src/main/scala/kafka/api/RequestOrResponse.scala core/src/main/scala/kafka/api/RequestOrResponse.scala index ac5b64e..611bb42 100644 --- core/src/main/scala/kafka/api/RequestOrResponse.scala +++ core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -19,6 +19,12 @@ package kafka.api import java.nio._ + +object RequestOrResponse { + val DefaultCharset = "UTF-8" +} + + private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) { def sizeInBytes: Int diff --git core/src/main/scala/kafka/common/ErrorMapping.scala core/src/main/scala/kafka/common/ErrorMapping.scala index b3042e7..ba6f352 100644 --- core/src/main/scala/kafka/common/ErrorMapping.scala +++ core/src/main/scala/kafka/common/ErrorMapping.scala @@ -34,13 +34,12 @@ object ErrorMapping { val InvalidMessageCode : Short = 2 val UnknownTopicOrPartitionCode : Short = 3 val InvalidFetchSizeCode : Short = 4 - val InvalidFetchRequestFormatCode : Short = 5 - val LeaderNotAvailableCode : Short = 6 - val NotLeaderForPartitionCode : Short = 7 - val RequestTimedOutCode: Short = 8 - val BrokerNotAvailableCode: Short = 9 - val ReplicaNotAvailableCode: Short = 10 - val MessageSizeTooLargeCode: Short = 11 + val LeaderNotAvailableCode : Short = 5 + val NotLeaderForPartitionCode : Short = 6 + val RequestTimedOutCode: Short = 7 + val BrokerNotAvailableCode: Short = 8 + val ReplicaNotAvailableCode: Short = 9 + val MessageSizeTooLargeCode: Short = 10 private val exceptionToCode = Map[Class[Throwable], Short]( @@ -48,7 +47,6 @@ object ErrorMapping { classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode, classOf[UnknownTopicOrPartitionException].asInstanceOf[Class[Throwable]] -> UnknownTopicOrPartitionCode, classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode, - classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode, classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode, classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode, classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode, diff --git core/src/main/scala/kafka/common/FetchRequestFormatException.scala core/src/main/scala/kafka/common/FetchRequestFormatException.scala deleted file mode 100644 index 0bc7d4e..0000000 --- core/src/main/scala/kafka/common/FetchRequestFormatException.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.common - -class FetchRequestFormatException(val message: String) extends RuntimeException(message) { - def this() = this(null) -} diff --git core/src/main/scala/kafka/common/TopicAndPartition.scala core/src/main/scala/kafka/common/TopicAndPartition.scala new file mode 100644 index 0000000..4b1f3a3 --- /dev/null +++ core/src/main/scala/kafka/common/TopicAndPartition.scala @@ -0,0 +1,29 @@ +package kafka.common + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Convenience case class since (topic, partition) pairs are ubiquitous. + */ +case class TopicAndPartition(topic: String, partition: Int) { + + def this(tuple: (String, Int)) = this(tuple._1, tuple._2) + + def asTuple = (topic, partition) +} + diff --git core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index a94906d..3b9eec1 100644 --- core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import java.util.concurrent.locks.ReentrantLock import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} +import kafka.common.TopicAndPartition /** @@ -38,7 +39,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) { private var partitionMap: immutable.Map[(String, Int), PartitionTopicInfo] = null private var cluster: Cluster = null - private val noLeaderPartitionSet = new mutable.HashSet[(String, Int)] + private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition] private val lock = new ReentrantLock private val cond = lock.newCondition() private val leaderFinderThread = new ShutdownableThread(consumerIdString + "-leader-finder-thread"){ @@ -48,21 +49,22 @@ class ConsumerFetcherManager(private val consumerIdString: String, try { if (noLeaderPartitionSet.isEmpty) cond.await() - for ((topic, partitionId) <- noLeaderPartitionSet) { - // find the leader for this partition - getLeaderForPartition(zkClient, topic, partitionId) match { - case Some(leaderId) => - cluster.getBroker(leaderId) match { - case Some(broker) => - val pti = partitionMap((topic, partitionId)) - addFetcher(topic, partitionId, pti.getFetchOffset(), broker) - noLeaderPartitionSet.remove((topic, partitionId)) - case None => - error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started" - .format(leaderId, topic, partitionId)) - } - case None => // let it go since we will keep retrying - } + noLeaderPartitionSet.foreach { + case(TopicAndPartition(topic, partitionId)) => + // find the leader for this partition + getLeaderForPartition(zkClient, topic, partitionId) match { + case Some(leaderId) => + cluster.getBroker(leaderId) match { + case Some(broker) => + val pti = partitionMap((topic, partitionId)) + addFetcher(topic, partitionId, pti.getFetchOffset(), broker) + noLeaderPartitionSet.remove(TopicAndPartition(topic, partitionId)) + case None => + error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started" + .format(leaderId, topic, partitionId)) + } + case None => // let it go since we will keep retrying + } } } finally { lock.unlock() @@ -84,7 +86,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, try { partitionMap = topicInfos.map(tpi => ((tpi.topic, tpi.partitionId), tpi)).toMap this.cluster = cluster - noLeaderPartitionSet ++= topicInfos.map(tpi => (tpi.topic, tpi.partitionId)) + noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId)) cond.signalAll() } finally { lock.unlock() @@ -117,7 +119,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, pti } - def addPartitionsWithError(partitionList: Iterable[(String, Int)]) { + def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) { debug("adding partitions with error %s".format(partitionList)) lock.lock() try { diff --git core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index f13f449..01db46e 100644 --- core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -21,6 +21,8 @@ import kafka.cluster.Broker import kafka.server.AbstractFetcherThread import kafka.message.ByteBufferMessageSet import kafka.api.{FetchRequest, OffsetRequest, PartitionData} +import kafka.common.TopicAndPartition + class ConsumerFetcherThread(name: String, val config: ConsumerConfig, @@ -57,7 +59,7 @@ class ConsumerFetcherThread(name: String, } // any logic for partitions whose leader has changed - def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) { + def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { consumerFetcherManager.addPartitionsWithError(partitions) } } diff --git core/src/main/scala/kafka/javaapi/FetchRequest.scala core/src/main/scala/kafka/javaapi/FetchRequest.scala new file mode 100644 index 0000000..44d148e --- /dev/null +++ core/src/main/scala/kafka/javaapi/FetchRequest.scala @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi + +import scala.collection.JavaConversions +import kafka.api.PartitionFetchInfo +import java.nio.ByteBuffer +import kafka.common.TopicAndPartition + + +class FetchRequest(correlationId: Int, + clientId: String, + replicaId: Int, + maxWait: Int, + minBytes: Int, + requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) { + + val underlying = { + val scalaMap = JavaConversions.asMap(requestInfo).toMap + kafka.api.FetchRequest( + correlationId = correlationId, + clientId = clientId, + replicaId = replicaId, + maxWait = maxWait, + minBytes = minBytes, + requestInfo = scalaMap + ) + } + + def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } + + def sizeInBytes = underlying.sizeInBytes + + override def toString = underlying.toString + + override def equals(other: Any) = canEqual(other) && { + val otherFetchRequest = other.asInstanceOf[kafka.javaapi.FetchRequest] + this.underlying.equals(otherFetchRequest.underlying) + } + + def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchRequest] + + override def hashCode = underlying.hashCode + +} + diff --git core/src/main/scala/kafka/javaapi/FetchResponse.scala core/src/main/scala/kafka/javaapi/FetchResponse.scala index 3d0928a..057d7d9 100644 --- core/src/main/scala/kafka/javaapi/FetchResponse.scala +++ core/src/main/scala/kafka/javaapi/FetchResponse.scala @@ -17,17 +17,33 @@ package kafka.javaapi -import kafka.api.TopicData +import kafka.api.PartitionData +import kafka.common.TopicAndPartition class FetchResponse( val versionId: Short, val correlationId: Int, - private val data: Array[TopicData] ) { + private val data: Map[TopicAndPartition, PartitionData] ) { - private val underlying = new kafka.api.FetchResponse(versionId, correlationId, data) + private val underlying = kafka.api.FetchResponse(versionId, correlationId, data) def messageSet(topic: String, partition: Int): kafka.javaapi.message.ByteBufferMessageSet = { import Implicits._ underlying.messageSet(topic, partition) } + + def highWatermark(topic: String, partition: Int) = underlying.highWatermark(topic, partition) + + def hasError = underlying.hasError + + def errorCode(topic: String, partition: Int) = underlying.errorCode(topic, partition) + + override def equals(other: Any) = canEqual(other) && { + val otherFetchResponse = other.asInstanceOf[kafka.javaapi.FetchResponse] + this.underlying.equals(otherFetchResponse.underlying) + } + + def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchResponse] + + override def hashCode = underlying.hashCode } diff --git core/src/main/scala/kafka/javaapi/ProducerRequest.scala core/src/main/scala/kafka/javaapi/ProducerRequest.scala deleted file mode 100644 index 4cf16f5..0000000 --- core/src/main/scala/kafka/javaapi/ProducerRequest.scala +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.javaapi - -import kafka.api.RequestOrResponse -import kafka.api.{RequestKeys, TopicData} -import java.nio.ByteBuffer - -class ProducerRequest(val correlationId: Int, - val clientId: String, - val requiredAcks: Short, - val ackTimeoutMs: Int, - val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { - - val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data) - - def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } - - def sizeInBytes(): Int = underlying.sizeInBytes - - override def toString: String = - underlying.toString - - override def equals(other: Any): Boolean = underlying.equals(other) - - def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] - - override def hashCode: Int = underlying.hashCode - -} \ No newline at end of file diff --git core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala index c1b9fb9..c4393d0 100644 --- core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala +++ core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala @@ -17,9 +17,8 @@ package kafka.javaapi.consumer -import kafka.api.FetchRequest -import kafka.javaapi.FetchResponse import kafka.utils.threadsafe +import kafka.javaapi.FetchResponse /** * A consumer of kafka messages @@ -32,15 +31,28 @@ class SimpleConsumer(val host: String, val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize) /** - * Fetch a set of messages from a topic. + * Fetch a set of messages from a topic. This version of the fetch method + * takes the Scala version of a fetch request (i.e., + * [[kafka.api.FetchRequest]] and is intended for use with the + * [[kafka.api.FetchRequestBuilder]]. * * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. * @return a set of fetched messages */ - def fetch(request: FetchRequest): FetchResponse = { + def fetch(request: kafka.api.FetchRequest): FetchResponse = { import kafka.javaapi.Implicits._ underlying.fetch(request) } + + /** + * Fetch a set of messages from a topic. + * + * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. + * @return a set of fetched messages + */ + def fetch(request: kafka.javaapi.FetchRequest): FetchResponse = { + fetch(request.underlying) + } /** * Get a list of valid offsets (up to maxSize) before the given time. diff --git core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala deleted file mode 100644 index c1ff168..0000000 --- core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.javaapi.producer - -import kafka.producer.SyncProducerConfig -import kafka.javaapi.message.ByteBufferMessageSet -import kafka.api.{ProducerResponse, PartitionData, TopicData} - -class SyncProducer(syncProducer: kafka.producer.SyncProducer) { - - def this(config: SyncProducerConfig) = this(new kafka.producer.SyncProducer(config)) - - val underlying = syncProducer - - def send(producerRequest: kafka.javaapi.ProducerRequest): ProducerResponse = { - underlying.send(producerRequest.underlying) - } - - def send(topic: String, messages: ByteBufferMessageSet): ProducerResponse = { - val partitionData = Array[PartitionData]( new PartitionData(-1, messages.underlying) ) - val data = Array[TopicData]( new TopicData(topic, partitionData) ) - val producerRequest = new kafka.api.ProducerRequest(-1, "", 0, 0, data) - underlying.send(producerRequest) - } - - def close() { - underlying.close - } -} diff --git core/src/main/scala/kafka/network/RequestChannel.scala core/src/main/scala/kafka/network/RequestChannel.scala index 7f407a1..da3488e 100644 --- core/src/main/scala/kafka/network/RequestChannel.scala +++ core/src/main/scala/kafka/network/RequestChannel.scala @@ -23,12 +23,14 @@ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.nio.ByteBuffer import kafka.api._ +import kafka.common.TopicAndPartition + object RequestChannel { val AllDone = new Request(1, 2, getShutdownReceive(), 0) def getShutdownReceive() = { - val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Array[TopicData]()) + val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, PartitionData]()) val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2) byteBuffer.putShort(RequestKeys.ProduceKey) emptyProducerRequest.writeTo(byteBuffer) diff --git core/src/main/scala/kafka/producer/SyncProducer.scala core/src/main/scala/kafka/producer/SyncProducer.scala index 7f865d6..2146061 100644 --- core/src/main/scala/kafka/producer/SyncProducer.scala +++ core/src/main/scala/kafka/producer/SyncProducer.scala @@ -178,4 +178,4 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { object ProducerRequestStat extends KafkaMetricsGroup { val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) val requestSizeHist = newHistogram("ProducerRequestSize") -} \ No newline at end of file +} diff --git core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index c70d968..6772252 100644 --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -18,13 +18,13 @@ package kafka.producer.async import kafka.common._ -import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet} +import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} import kafka.producer._ import kafka.serializer.Encoder import kafka.utils.{Utils, Logging} -import scala.collection.Map +import scala.collection.{Seq, Map} import scala.collection.mutable.{ListBuffer, HashMap} -import kafka.api._ +import kafka.api.{TopicMetadata, ProducerRequest, PartitionData} class DefaultEventHandler[K,V](config: ProducerConfig, @@ -81,12 +81,13 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap) val failedTopicPartitions = send(brokerid, messageSetPerBroker) - for( (topic, partition) <- failedTopicPartitions ) { - eventsPerBrokerMap.get((topic, partition)) match { + failedTopicPartitions.foreach(topicPartition => { + eventsPerBrokerMap.get(topicPartition) match { case Some(data) => failedProduceRequests.appendAll(data) case None => // nothing + } - } + }) } } catch { case t: Throwable => error("Failed to send messages", t) @@ -122,8 +123,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig, serializedProducerData } - def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = { - val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] + def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]] = { + val ret = new HashMap[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]] try { for (event <- events) { val topicPartitionsList = getPartitionListForTopic(event) @@ -135,16 +136,16 @@ class DefaultEventHandler[K,V](config: ProducerConfig, // postpone the failure until the send operation, so that requests for other brokers are handled correctly val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1) - var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null + var dataPerBroker: HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]] = null ret.get(leaderBrokerId) match { case Some(element) => - dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]] + dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]] case None => - dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]] + dataPerBroker = new HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]] ret.put(leaderBrokerId, dataPerBroker) } - val topicAndPartition = (event.getTopic, brokerPartition.partitionId) + val topicAndPartition = TopicAndPartition(event.getTopic, brokerPartition.partitionId) var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null dataPerBroker.get(topicAndPartition) match { case Some(element) => @@ -199,39 +200,30 @@ class DefaultEventHandler[K,V](config: ProducerConfig, * @param messagesPerTopic the messages as a map from (topic, partition) -> messages * @return the set (topic, partitions) messages which incurred an error sending or processing */ - private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Seq[(String, Int)] = { + private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = { if(brokerId < 0) { - warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic)) + warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic)) messagesPerTopic.keys.toSeq } else if(messagesPerTopic.size > 0) { - val topics = new HashMap[String, ListBuffer[PartitionData]]() - for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) { - val partitionData = topics.getOrElseUpdate(topicName, new ListBuffer[PartitionData]()) - partitionData.append(new PartitionData(partitionId, messagesSet)) + val topicPartitionDataPairs = messagesPerTopic.toSeq.map { + case (topicAndPartition, messages) => + (topicAndPartition, new PartitionData(topicAndPartition.partition, messages)) } - val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, - config.requestTimeoutMs, topicData) + config.requestTimeoutMs, Map(topicPartitionDataPairs:_*)) try { val syncProducer = producerPool.getProducer(brokerId) val response = syncProducer.send(producerRequest) - trace("producer sent messages for topics %s to broker %d on %s:%d" + trace("Producer sent messages for topics %s to broker %d on %s:%d" .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port)) - var msgIdx = -1 - val errors = new ListBuffer[(String, Int)] - for( topic <- topicData; partition <- topic.partitionDataArray ) { - msgIdx += 1 - if (msgIdx >= response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError) { - errors.append((topic.topic, partition.partition)) - if (msgIdx < response.errors.size) - warn("Received error " + ErrorMapping.exceptionFor(response.errors(msgIdx)) + - "from broker %d on %s:%d".format(brokerId, topic.topic, partition.partition)) - } - } - errors + if (response.status.size != producerRequest.data.size) + throw new KafkaException("Incomplete response (%s) for producer request (%s)" + .format(response, producerRequest)) + response.status.filter(_._2.error != ErrorMapping.NoError).toSeq + .map(partitionStatus => partitionStatus._1) } catch { - case e => - warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), e) + case t: Throwable => + warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), t) messagesPerTopic.keys.toSeq } } else { @@ -239,7 +231,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } } - private def groupMessagesToSet(eventsPerTopicAndPartition: Map[(String,Int), Seq[ProducerData[K,Message]]]): Map[(String, Int), ByteBufferMessageSet] = { + private def groupMessagesToSet(eventsPerTopicAndPartition: Map[TopicAndPartition, Seq[ProducerData[K,Message]]]) = { /** enforce the compressed.topics config here. * If the compression codec is anything other than NoCompressionCodec, * Enable compression only for specified topics if any @@ -255,25 +247,23 @@ class DefaultEventHandler[K,V](config: ProducerConfig, ( topicAndPartition, config.compressionCodec match { case NoCompressionCodec => - trace("Sending %d messages with no compression to topic %s on partition %d" - .format(messages.size, topicAndPartition._1, topicAndPartition._2)) + trace("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition)) new ByteBufferMessageSet(NoCompressionCodec, messages: _*) case _ => config.compressedTopics.size match { case 0 => - trace("Sending %d messages with compression codec %d to topic %s on partition %d" - .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2)) + trace("Sending %d messages with compression codec %d to %s" + .format(messages.size, config.compressionCodec.codec, topicAndPartition)) new ByteBufferMessageSet(config.compressionCodec, messages: _*) case _ => - if(config.compressedTopics.contains(topicAndPartition._1)) { - trace("Sending %d messages with compression codec %d to topic %s on partition %d" - .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2)) + if(config.compressedTopics.contains(topicAndPartition.topic)) { + trace("Sending %d messages with compression codec %d to %s" + .format(messages.size, config.compressionCodec.codec, topicAndPartition)) new ByteBufferMessageSet(config.compressionCodec, messages: _*) } else { - trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s" - .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1, - config.compressedTopics.toString)) + trace("Sending %d messages to %s with no compression as it is not in compressed.topics - %s" + .format(messages.size, topicAndPartition, config.compressedTopics.toString)) new ByteBufferMessageSet(NoCompressionCodec, messages: _*) } } diff --git core/src/main/scala/kafka/server/AbstractFetcherThread.scala core/src/main/scala/kafka/server/AbstractFetcherThread.scala index ed25962..225f83b 100644 --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.cluster.Broker import kafka.consumer.SimpleConsumer -import kafka.common.ErrorMapping +import kafka.common.{TopicAndPartition, ErrorMapping} import collection.mutable import kafka.message.ByteBufferMessageSet import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder} @@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) extends ShutdownableThread(name) { - private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map + private val fetchMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map private val fetchMapLock = new Object val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id) @@ -50,7 +50,7 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket def handleOffsetOutOfRange(topic: String, partitionId: Int): Long // deal with partitions with errors, potentially due to leadership changes - def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) + def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) override def shutdown(){ super.shutdown() @@ -65,12 +65,15 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket minBytes(minBytes) fetchMapLock synchronized { - for ( ((topic, partitionId), offset) <- fetchMap ) - builder.addFetch(topic, partitionId, offset.longValue, fetchSize) + fetchMap.foreach { + case((topicAndPartition, offset)) => + builder.addFetch(topicAndPartition.topic, topicAndPartition.partition, + offset, fetchSize) + } } val fetchRequest = builder.build() - val partitionsWithError = new mutable.HashSet[(String, Int)] + val partitionsWithError = new mutable.HashSet[TopicAndPartition] var response: FetchResponse = null try { trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) @@ -90,37 +93,35 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket if (response != null) { // process fetched data fetchMapLock synchronized { - for ( topicData <- response.data ) { - for ( partitionData <- topicData.partitionDataArray) { - val topic = topicData.topic - val partitionId = partitionData.partition - val key = (topic, partitionId) - val currentOffset = fetchMap.get(key) + response.data.foreach { + case(topicAndPartition, partitionData) => + val (topic, partitionId) = topicAndPartition.asTuple + val currentOffset = fetchMap.get(topicAndPartition) if (currentOffset.isDefined) { partitionData.error match { case ErrorMapping.NoError => processPartitionData(topic, currentOffset.get, partitionData) val validBytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes - val newOffset = currentOffset.get + validBytes - fetchMap.put(key, newOffset) + val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes + fetchMap.put(topicAndPartition, newOffset) FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset fetcherMetrics.byteRate.mark(validBytes) case ErrorMapping.OffsetOutOfRangeCode => val newOffset = handleOffsetOutOfRange(topic, partitionId) - fetchMap.put(key, newOffset) + fetchMap.put(topicAndPartition, newOffset) warn("current offset %d for topic %s partition %d out of range; reset offset to %d" - .format(currentOffset.get, topic, partitionId, newOffset)) + .format(currentOffset.get, topic, partitionId, newOffset)) case _ => error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host), - ErrorMapping.exceptionFor(partitionData.error)) - partitionsWithError += key - fetchMap.remove(key) + ErrorMapping.exceptionFor(partitionData.error)) + partitionsWithError += topicAndPartition + fetchMap.remove(topicAndPartition) } } - } } } } + if (partitionsWithError.size > 0) { debug("handling partitions with error for %s".format(partitionsWithError)) handlePartitionsWithErrors(partitionsWithError) @@ -129,19 +130,19 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket def addPartition(topic: String, partitionId: Int, initialOffset: Long) { fetchMapLock synchronized { - fetchMap.put((topic, partitionId), initialOffset) + fetchMap.put(TopicAndPartition(topic, partitionId), initialOffset) } } def removePartition(topic: String, partitionId: Int) { fetchMapLock synchronized { - fetchMap.remove((topic, partitionId)) + fetchMap.remove(TopicAndPartition(topic, partitionId)) } } def hasPartition(topic: String, partitionId: Int): Boolean = { fetchMapLock synchronized { - fetchMap.get((topic, partitionId)).isDefined + fetchMap.get(TopicAndPartition(topic, partitionId)).isDefined } } diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala index e9df147..889c867 100644 --- core/src/main/scala/kafka/server/KafkaApis.scala +++ core/src/main/scala/kafka/server/KafkaApis.scala @@ -20,7 +20,6 @@ package kafka.server import java.io.IOException import kafka.admin.{CreateTopicCommand, AdminUtils} import kafka.api._ -import kafka.common._ import kafka.message._ import kafka.network._ import kafka.utils.{TopicNameValidator, Pool, SystemTime, Logging} @@ -32,6 +31,7 @@ import kafka.network.RequestChannel.Response import java.util.concurrent.TimeUnit import kafka.metrics.KafkaMetricsGroup import org.I0Itec.zkclient.ZkClient +import kafka.common._ /** * Logic to handle the various Kafka requests @@ -40,13 +40,14 @@ class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, val zkClient: ZkClient, brokerId: Int) extends Logging { - private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId) - private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel) + + private val producerRequestPurgatory = new ProducerRequestPurgatory + private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel) private val delayedRequestMetrics = new DelayedRequestMetrics private val topicNameValidator = new TopicNameValidator(replicaManager.config.maxTopicNameLength) private val requestLogger = Logger.getLogger("kafka.request.logger") - this.logIdent = "[KafkaApi on Broker " + brokerId + "], " + this.logIdent = "[KafkaApi-%d] ".format(brokerId) /** * Top-level method that handles all requests and multiplexes to the right api @@ -93,18 +94,18 @@ class KafkaApis(val requestChannel: RequestChannel, } /** - * Check if the partitionDataArray from a produce request can unblock any + * Check if a partitionData from a produce request can unblock any * DelayedFetch requests. */ - def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) { - var satisfied = new mutable.ArrayBuffer[DelayedFetch] - for(partitionData <- partitionDatas) - satisfied ++= fetchRequestPurgatory.update(RequestKey(topic, partitionData.partition), null) - trace("Producer request to %s unblocked %d fetch requests.".format(topic, satisfied.size)) + def maybeUnblockDelayedFetchRequests(topic: String, partitionData: PartitionData) { + val partition = partitionData.partition + val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), null) + trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size)) + // send any newly unblocked responses for(fetchReq <- satisfied) { val topicData = readMessageSets(fetchReq.fetch) - val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData) + val response = FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData) requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response))) } } @@ -119,28 +120,25 @@ class KafkaApis(val requestChannel: RequestChannel, requestLogger.trace("Handling producer request " + request.toString) trace("Handling producer request " + request.toString) - val response = produceToLocalLog(produceRequest) + val localProduceResponse = produceToLocalLog(produceRequest) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) - val partitionsInError = response.errors.count(_ != ErrorMapping.NoError) - - for (topicData <- produceRequest.data) - maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray) - - if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1 || - produceRequest.data.size <= 0 || partitionsInError == response.errors.size) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + + val numPartitionsInError = localProduceResponse.status.count(_._2.error != ErrorMapping.NoError) + produceRequest.data.foreach(partitionAndData => + maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._2)) + + if (produceRequest.requiredAcks == 0 || + produceRequest.requiredAcks == 1 || + produceRequest.numPartitions <= 0 || + numPartitionsInError == produceRequest.numPartitions) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(localProduceResponse))) else { // create a list of (topic, partition) pairs to use as keys for this delayed request - val producerRequestKeys = produceRequest.data.flatMap(topicData => { - val topic = topicData.topic - topicData.partitionDataArray.map(partitionData => { - RequestKey(topic, partitionData.partition) - }) - }) + val producerRequestKeys = produceRequest.data.keys.map( + topicAndPartition => new RequestKey(topicAndPartition)).toSeq val delayedProduce = new DelayedProduce( - producerRequestKeys, request, - response.errors, response.offsets, + producerRequestKeys, request, localProduceResponse, produceRequest, produceRequest.ackTimeoutMs.toLong) producerRequestPurgatory.watch(delayedProduce) @@ -164,43 +162,41 @@ class KafkaApis(val requestChannel: RequestChannel, */ private def produceToLocalLog(request: ProducerRequest): ProducerResponse = { trace("Produce [%s] to local log ".format(request.toString)) - val requestSize = request.topicPartitionCount - val errors = new Array[Short](requestSize) - val offsets = new Array[Long](requestSize) - - var msgIndex = -1 - for(topicData <- request.data) { - for(partitionData <- topicData.partitionDataArray) { - msgIndex += 1 - BrokerTopicStat.getBrokerTopicStat(topicData.topic).bytesInRate.mark(partitionData.messages.sizeInBytes) - BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes) - try { - val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition) - val log = localReplica.log.get - log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) - // we may need to increment high watermark since ISR could be down to 1 - localReplica.partition.maybeIncrementLeaderHW(localReplica) - offsets(msgIndex) = log.logEndOffset - errors(msgIndex) = ErrorMapping.NoError.toShort - trace("%d bytes written to logs, nextAppendOffset = %d" - .format(partitionData.messages.sizeInBytes, offsets(msgIndex))) - } catch { - case e => - BrokerTopicStat.getBrokerTopicStat(topicData.topic).failedProduceRequestRate.mark() - BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark() - error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e) - e match { - case _: IOException => - fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) - System.exit(1) - case _ => - errors(msgIndex) = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort - offsets(msgIndex) = -1 - } - } + + val localErrorsAndOffsets = request.data.map (topicAndPartitionData => { + val (topic, partitionData) = (topicAndPartitionData._1.topic, topicAndPartitionData._2) + BrokerTopicStat.getBrokerTopicStat(topic).bytesInRate.mark(partitionData.messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes) + + try { + val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partitionData.partition) + val log = localReplica.log.get + log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) + // we may need to increment high watermark since ISR could be down to 1 + localReplica.partition.maybeIncrementLeaderHW(localReplica) + val responseStatus = ProducerResponseStatus(ErrorMapping.NoError, log.logEndOffset) + trace("%d bytes written to logs, nextAppendOffset = %d" + .format(partitionData.messages.sizeInBytes, responseStatus.nextOffset)) + (TopicAndPartition(topic, partitionData.partition), responseStatus) + } catch { + case e: Throwable => + BrokerTopicStat.getBrokerTopicStat(topic).failedProduceRequestRate.mark() + BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark() + error("Error processing ProducerRequest on %s:%d".format(topic, partitionData.partition), e) + e match { + case _: IOException => + fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) + // compiler requires scala.sys.exit (not System.exit). + exit(1) + case _ => + val (error, offset) = (ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1L) + (TopicAndPartition(topic, partitionData.partition), ProducerResponseStatus(error, offset)) + } } } - new ProducerResponse(request.versionId, request.correlationId, errors, offsets) + ) + + ProducerResponse(request.versionId, request.correlationId, localErrorsAndOffsets) } /** @@ -212,27 +208,17 @@ class KafkaApis(val requestChannel: RequestChannel, requestLogger.trace("Handling fetch request " + fetchRequest.toString) trace("Handling fetch request " + fetchRequest.toString) - // validate the request - try { - fetchRequest.validate() - } catch { - case e:FetchRequestFormatException => - val response = new FetchResponse(fetchRequest.versionId, fetchRequest.correlationId, Array.empty) - val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response)) - requestChannel.sendResponse(channelResponse) - } - if(fetchRequest.isFromFollower) { maybeUpdatePartitionHW(fetchRequest) // after updating HW, some delayed produce requests may be unblocked var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] - fetchRequest.offsetInfo.foreach(topicOffsetInfo => { - topicOffsetInfo.partitions.foreach(partition => { - val key = RequestKey(topicOffsetInfo.topic, partition) + fetchRequest.requestInfo.foreach { + case (topicAndPartition, _) => + val key = new RequestKey(topicAndPartition) satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key) - }) - }) - debug("Replica %d fetch unblocked %d producer requests.".format(fetchRequest.replicaId, satisfiedProduceRequests.size)) + } + debug("Replica %d fetch unblocked %d producer requests." + .format(fetchRequest.replicaId, satisfiedProduceRequests.size)) satisfiedProduceRequests.foreach(_.respond()) } @@ -243,13 +229,13 @@ class KafkaApis(val requestChannel: RequestChannel, fetchRequest.numPartitions <= 0) { val topicData = readMessageSets(fetchRequest) debug("Returning fetch response %s for fetch request with correlation id %d".format( - topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId)) - val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData) + topicData.values.map(_.error).mkString(","), fetchRequest.correlationId)) + val response = FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { debug("Putting fetch request into purgatory") // create a list of (topic, partition) pairs to use as keys for this delayed request - val delayedFetchKeys = fetchRequest.offsetInfo.flatMap(o => o.partitions.map(RequestKey(o.topic, _))) + val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_)) val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait) fetchRequestPurgatory.watch(delayedFetch) } @@ -259,86 +245,77 @@ class KafkaApis(val requestChannel: RequestChannel, * Calculate the number of available bytes for the given fetch request */ private def availableFetchBytes(fetchRequest: FetchRequest): Long = { - var totalBytes = 0L - for(offsetDetail <- fetchRequest.offsetInfo) { - for(i <- 0 until offsetDetail.partitions.size) { - debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i))) + val totalBytes = fetchRequest.requestInfo.foldLeft(0L)((folded, curr) => { + folded + + { + val (topic, partition) = (curr._1.topic, curr._1.partition) + val (offset, fetchSize) = (curr._2.offset, curr._2.fetchSize) + debug("Fetching log for topic %s partition %d".format(topic, partition)) try { - val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i)) + val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition) val end = if (!fetchRequest.isFromFollower) { leader.highWatermark } else { leader.logEndOffset } - val available = max(0, end - offsetDetail.offsets(i)) - totalBytes += math.min(offsetDetail.fetchSizes(i), available) + val available = max(0, end - offset) + math.min(fetchSize, available) } catch { case e: UnknownTopicOrPartitionException => - info("Invalid partition %d in fetch request from client %d." - .format(offsetDetail.partitions(i), fetchRequest.clientId)) + info("Invalid partition %d in fetch request from client %s." + .format(partition, fetchRequest.clientId)) + 0 case e => error("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s" - .format(offsetDetail.topic, offsetDetail.partitions(i), brokerId, fetchRequest.clientId), e) + .format(topic, partition, brokerId, fetchRequest.clientId), e) + 0 } } - } + }) trace(totalBytes + " available bytes for fetch request.") totalBytes } private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) { - val offsets = fetchRequest.offsetInfo - debug("Act on update partition HW, check offset detail: %s ".format(offsets)) - for(offsetDetail <- offsets) { - val topic = offsetDetail.topic - val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets) - for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) { - replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset) - } - } + debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest)) + fetchRequest.requestInfo.foreach(info => { + val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset) + replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset) + }) } /** - * Read from all the offset details given and produce an array of topic datas + * Read from all the offset details given and return a map of + * (topic, partition) -> PartitionData */ - private def readMessageSets(fetchRequest: FetchRequest): Array[TopicData] = { - val offsets = fetchRequest.offsetInfo - val fetchedData = new mutable.ArrayBuffer[TopicData]() - - for(offsetDetail <- offsets) { - val info = new mutable.ArrayBuffer[PartitionData]() - val topic = offsetDetail.topic - val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes) - for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { - val isFetchFromFollower = fetchRequest.isFromFollower() - val partitionInfo = - try { - val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower) - BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes) - BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes) - if (!isFetchFromFollower) { - new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages) - } else { - debug("Leader %d for topic %s partition %d received fetch request from follower %d" - .format(brokerId, topic, partition, fetchRequest.replicaId)) - debug("Leader %d returning %d messages for topic %s partition %d to follower %d" - .format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId)) - new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages) - } - } - catch { - case e => - BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark() - BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark() - error("error when processing request " + (topic, partition, offset, fetchSize), e) - new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), - offset, -1L, MessageSet.Empty) + private def readMessageSets(fetchRequest: FetchRequest) = { + val isFetchFromFollower = fetchRequest.isFromFollower + fetchRequest.requestInfo.map { + case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => + val partitionData = try { + val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower) + BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes) + if (!isFetchFromFollower) { + new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages) + } else { + debug("Leader %d for topic %s partition %d received fetch request from follower %d" + .format(brokerId, topic, partition, fetchRequest.replicaId)) + debug("Leader %d returning %d messages for topic %s partition %d to follower %d" + .format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId)) + new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages) } - info.append(partitionInfo) - } - fetchedData.append(new TopicData(topic, info.toArray)) + } + catch { + case t: Throwable => + BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark() + BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark() + error("error when processing request " + (topic, partition, offset, fetchSize), t) + new PartitionData(partition, ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), + offset, -1L, MessageSet.Empty) + } + (TopicAndPartition(topic, partition), partitionData) } - fetchedData.toArray } /** @@ -454,8 +431,14 @@ class KafkaApis(val requestChannel: RequestChannel, private [kafka] case class RequestKey(topic: String, partition: Int) extends MetricKey { + + def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition) + + def topicAndPartition = TopicAndPartition(topic, partition) + override def keyLabel = "%s-%d".format(topic, partition) } + /** * A delayed fetch request */ @@ -465,9 +448,9 @@ class KafkaApis(val requestChannel: RequestChannel, /** * A holding pen for fetch requests waiting to be satisfied */ - class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) { + class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) { - this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId) + this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId) /** * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field @@ -480,7 +463,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def expire(delayed: DelayedFetch) { val topicData = readMessageSets(delayed.fetch) - val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) + val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) val fromFollower = delayed.fetch.isFromFollower delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) @@ -489,48 +472,43 @@ class KafkaApis(val requestChannel: RequestChannel, class DelayedProduce(keys: Seq[RequestKey], request: RequestChannel.Request, - localErrors: Array[Short], - requiredOffsets: Array[Long], + localProduceResponse: ProducerResponse, val produce: ProducerRequest, delayMs: Long) extends DelayedRequest(keys, request, delayMs) with Logging { + private val initialErrorsAndOffsets = localProduceResponse.status /** * Map of (topic, partition) -> partition status * The values in this map don't need to be synchronized since updates to the * values are effectively synchronized by the ProducerRequestPurgatory's * update method */ - private [kafka] val partitionStatus = keys.map(key => { - val keyIndex = keys.indexOf(key) + private [kafka] val partitionStatus = keys.map(requestKey => { + val producerResponseStatus = initialErrorsAndOffsets(TopicAndPartition(requestKey.topic, requestKey.partition)) // if there was an error in writing to the local replica's log, then don't // wait for acks on this partition - val acksPending = - if (localErrors(keyIndex) == ErrorMapping.NoError) { + val (acksPending, error, nextOffset) = + if (producerResponseStatus.error == ErrorMapping.NoError) { // Timeout error state will be cleared when requiredAcks are received - localErrors(keyIndex) = ErrorMapping.RequestTimedOutCode - true + (true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.nextOffset) } - else - false + else (false, producerResponseStatus.error, producerResponseStatus.nextOffset) - val initialStatus = new PartitionStatus(acksPending, localErrors(keyIndex), requiredOffsets(keyIndex)) - trace("Initial partition status for %s = %s".format(key, initialStatus)) - (key, initialStatus) + val initialStatus = PartitionStatus(acksPending, error, nextOffset) + trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus)) + (requestKey, initialStatus) }).toMap - def respond() { - val errorsAndOffsets: (List[Short], List[Long]) = ( - keys.foldRight - ((List[Short](), List[Long]())) - ((key: RequestKey, result: (List[Short], List[Long])) => { - val status = partitionStatus(key) - (status.error :: result._1, status.requiredOffset :: result._2) - }) - ) - val response = new ProducerResponse(produce.versionId, produce.correlationId, - errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray) + + val finalErrorsAndOffsets = initialErrorsAndOffsets.map( + status => { + val pstat = partitionStatus(new RequestKey(status._1)) + (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset)) + }) + + val response = ProducerResponse(produce.versionId, produce.correlationId, finalErrorsAndOffsets) requestChannel.sendResponse(new RequestChannel.Response( request, new BoundedByteBufferSend(response))) @@ -565,9 +543,8 @@ class KafkaApis(val requestChannel: RequestChannel, fetchPartitionStatus.error = ErrorMapping.NoError } if (!fetchPartitionStatus.acksPending) { - val topicData = produce.data.find(_.topic == topic).get - val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get - maybeUnblockDelayedFetchRequests(topic, Array(partitionData)) + val partitionData = produce.data(followerFetchRequestKey.topicAndPartition) + maybeUnblockDelayedFetchRequests(topic, partitionData) } } @@ -576,7 +553,7 @@ class KafkaApis(val requestChannel: RequestChannel, satisfied } - class PartitionStatus(var acksPending: Boolean, + case class PartitionStatus(var acksPending: Boolean, var error: Short, val requiredOffset: Long) { def setThisBrokerNotLeader() { @@ -594,9 +571,9 @@ class KafkaApis(val requestChannel: RequestChannel, /** * A holding pen for produce requests waiting to be satisfied. */ - private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) { + private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) { - this.logIdent = "[ProducerRequestPurgatory-%d], ".format(brokerId) + this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId) protected def checkSatisfied(followerFetchRequestKey: RequestKey, delayedProduce: DelayedProduce) = diff --git core/src/main/scala/kafka/server/ReplicaFetcherManager.scala core/src/main/scala/kafka/server/ReplicaFetcherManager.scala index 5628a7b..334ce26 100644 --- core/src/main/scala/kafka/server/ReplicaFetcherManager.scala +++ core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -23,7 +23,7 @@ class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val r extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) { override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { - new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d on broker %d, ".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr) + new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d, ".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr) } def shutdown() { diff --git core/src/main/scala/kafka/server/ReplicaFetcherThread.scala core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 7c8ff4e..2fb0630 100644 --- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -20,6 +20,8 @@ package kafka.server import kafka.api.{OffsetRequest, PartitionData} import kafka.cluster.Broker import kafka.message.ByteBufferMessageSet +import kafka.common.TopicAndPartition + class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager) extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs, @@ -56,7 +58,7 @@ class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: Kafk } // any logic for partitions whose leader has changed - def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) { + def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { // no handler needed since the controller will make the changes accordingly } } diff --git core/src/main/scala/kafka/utils/Utils.scala core/src/main/scala/kafka/utils/Utils.scala index 9d9c084..e999141 100644 --- core/src/main/scala/kafka/utils/Utils.scala +++ core/src/main/scala/kafka/utils/Utils.scala @@ -367,7 +367,7 @@ object Utils extends Logging { /** * Read an unsigned integer from the given position without modifying the buffers * position - * @param The buffer to read from + * @param buffer the buffer to read from * @param index the index from which to read the integer * @return The integer read, as a long to avoid signedness */ diff --git core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala index 950aeaa..868bf02 100644 --- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala +++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala @@ -72,7 +72,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness // send an invalid offset try { val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build()) - fetchedWithError.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))) + fetchedWithError.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)) fail("Expected an OffsetOutOfRangeException exception to be thrown") } catch { case e: OffsetOutOfRangeException => @@ -101,23 +101,22 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness } } - { - // send some invalid offsets - val builder = new FetchRequestBuilder() - for( (topic, offset) <- topicOffsets ) - builder.addFetch(topic, offset, -1, 10000) + // send some invalid offsets + val builder = new FetchRequestBuilder() + for( (topic, offset) <- topicOffsets ) + builder.addFetch(topic, offset, -1, 10000) + + val request = builder.build() + val responses = consumer.fetch(request) + responses.data.values.foreach(pd => { + try { + ErrorMapping.maybeThrowException(pd.error) + fail("Expected an OffsetOutOfRangeException exception to be thrown") + } catch { + case e: OffsetOutOfRangeException => - val request = builder.build() - val responses = consumer.fetch(request) - for(topicData <- responses.data) { - try { - topicData.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)) - fail("Expected an OffsetOutOfRangeException exception to be thrown") - } catch { - case e: OffsetOutOfRangeException => - } } - } + }) } def testMultiProduce() { diff --git core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 604e2af..ef33238 100644 --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -19,7 +19,7 @@ package kafka.integration import java.nio.ByteBuffer import junit.framework.Assert._ -import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder} +import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder} import kafka.server.{KafkaRequestHandler, KafkaConfig} import java.util.Properties import kafka.producer.{ProducerData, Producer, ProducerConfig} @@ -31,8 +31,8 @@ import org.I0Itec.zkclient.ZkClient import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite import scala.collection._ -import kafka.common.{ErrorMapping, UnknownTopicOrPartitionException, FetchRequestFormatException, OffsetOutOfRangeException} import kafka.admin.{AdminUtils, CreateTopicCommand} +import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} /** * End to end tests of the primitive apis against a local server @@ -77,27 +77,11 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertEquals(request, deserializedRequest) } - def testFetchRequestEnforcesUniqueTopicsForOffsetDetails() { - val offsets = Array( - new OffsetDetail("topic1", Array(0, 1, 2), Array(0L, 0L, 0L), Array(1000, 1000, 1000)), - new OffsetDetail("topic2", Array(0, 1, 2), Array(0L, 0L, 0L), Array(1000, 1000, 1000)), - new OffsetDetail("topic1", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000)), - new OffsetDetail("topic2", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000)) - ) - val request = new FetchRequest(offsetInfo = offsets) - try { - consumer.fetch(request) - fail("FetchRequest should throw FetchRequestFormatException due to duplicate topics") - } catch { - case e: FetchRequestFormatException => "success" - } - } - def testEmptyFetchRequest() { - val offsets = Array[OffsetDetail]() - val request = new FetchRequest(offsetInfo = offsets) + val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]() + val request = new FetchRequest(requestInfo = partitionRequests) val fetched = consumer.fetch(request) - assertTrue(fetched.errorCode == ErrorMapping.NoError && fetched.data.size == 0) + assertTrue(!fetched.hasError && fetched.data.size == 0) } def testDefaultEncoderProducerAndFetch() { @@ -189,7 +173,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with try { val request = builder.build() val response = consumer.fetch(request) - response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))) + response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)) fail("Expected exception when fetching message with invalid offset") } catch { case e: OffsetOutOfRangeException => "this is good" @@ -205,7 +189,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with try { val request = builder.build() val response = consumer.fetch(request) - response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))) + response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)) fail("Expected exception when fetching message with invalid partition") } catch { case e: UnknownTopicOrPartitionException => "this is good" @@ -253,7 +237,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with try { val request = builder.build() val response = consumer.fetch(request) - response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))) + response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)) fail("Expected exception when fetching message with invalid offset") } catch { case e: OffsetOutOfRangeException => "this is good" @@ -269,7 +253,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with try { val request = builder.build() val response = consumer.fetch(request) - response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))) + response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)) fail("Expected exception when fetching message with invalid partition") } catch { case e: UnknownTopicOrPartitionException => "this is good" diff --git core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala deleted file mode 100644 index 3b1aa2a..0000000 --- core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.javaapi.integration - -import org.scalatest.junit.JUnit3Suite -import java.util.Properties -import kafka.producer.SyncProducerConfig -import kafka.javaapi.producer.SyncProducer -import kafka.javaapi.consumer.SimpleConsumer - -trait ProducerConsumerTestHarness extends JUnit3Suite { - - val port: Int - val host = "localhost" - var producer: SyncProducer = null - var consumer: SimpleConsumer = null - - override def setUp() { - val props = new Properties() - props.put("host", host) - props.put("port", port.toString) - props.put("buffer.size", "65536") - props.put("connect.timeout.ms", "100000") - props.put("reconnect.interval", "10000") - producer = new SyncProducer(new SyncProducerConfig(props)) - consumer = new SimpleConsumer(host, - port, - 1000000, - 64*1024) - super.setUp - } - - override def tearDown() { - super.tearDown - producer.close() - consumer.close() - } -} diff --git core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala index d0b187e..fe7c951 100644 --- core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala +++ core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.network; +package kafka.network import org.junit._ import org.scalatest.junit.JUnitSuite @@ -24,31 +24,45 @@ import java.nio.ByteBuffer import kafka.api._ import kafka.message.{Message, ByteBufferMessageSet} import kafka.cluster.Broker -import kafka.common.ErrorMapping import collection.mutable._ +import kafka.common.{TopicAndPartition, ErrorMapping} + object RpcDataSerializationTestUtils{ private val topic1 = "test1" private val topic2 = "test2" - private val leader1 = 0; + private val leader1 = 0 private val isr1 = List(0, 1, 2) - private val leader2 = 0; + private val leader2 = 0 private val isr2 = List(0, 2, 3) private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes))) private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes))) private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes))) private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes))) private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2, partitionData3) - private val topicData1 = new TopicData(topic1, partitionDataArray) - private val topicData2 = new TopicData(topic2, partitionDataArray) - private val topicDataArray = Array(topicData1, topicData2) - private val offsetDetail1 = new OffsetDetail(topic1, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100)) - private val offsetDetail2 = new OffsetDetail(topic2, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100)) - private val offsetDetailSeq = Seq(offsetDetail1, offsetDetail2) - private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) - private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) - private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) - private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) + + private val topicData = { + val groupedData = Array(topic1, topic2).flatMap(topic => + partitionDataArray.map(partitionData => + (TopicAndPartition(topic, partitionData.partition), partitionData))) + collection.immutable.Map(groupedData:_*) + } + + private val requestInfos = collection.immutable.Map( + TopicAndPartition(topic1, 0) -> PartitionFetchInfo(1000, 100), + TopicAndPartition(topic1, 1) -> PartitionFetchInfo(2000, 100), + TopicAndPartition(topic1, 2) -> PartitionFetchInfo(3000, 100), + TopicAndPartition(topic1, 3) -> PartitionFetchInfo(4000, 100), + TopicAndPartition(topic2, 0) -> PartitionFetchInfo(1000, 100), + TopicAndPartition(topic2, 1) -> PartitionFetchInfo(2000, 100), + TopicAndPartition(topic2, 2) -> PartitionFetchInfo(3000, 100), + TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100) + ) + + private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) + private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) + private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) + private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3) private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq) private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) @@ -78,19 +92,21 @@ object RpcDataSerializationTestUtils{ } def createTestProducerRequest: ProducerRequest = { - new ProducerRequest(1, "client 1", 0, 1000, topicDataArray) + new ProducerRequest(1, "client 1", 0, 1000, topicData) } - def createTestProducerResponse: ProducerResponse = { - new ProducerResponse(1, 1, Array(0.toShort, 0.toShort), Array(1000l, 2000l), 0) - } + def createTestProducerResponse: ProducerResponse = + ProducerResponse(1, 1, Map( + TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001), + TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001) + )) def createTestFetchRequest: FetchRequest = { - new FetchRequest(offsetInfo = offsetDetailSeq) + new FetchRequest(requestInfo = requestInfos) } def createTestFetchResponse: FetchResponse = { - new FetchResponse(1, 1, topicDataArray) + FetchResponse(1, 1, topicData) } def createTestOffsetRequest: OffsetRequest = { @@ -154,7 +170,7 @@ class RpcDataSerializationTest extends JUnitSuite { assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse, deserializedStopReplicaResponse) - buffer = ByteBuffer.allocate(producerRequest.sizeInBytes()) + buffer = ByteBuffer.allocate(producerRequest.sizeInBytes) producerRequest.writeTo(buffer) buffer.rewind() val deserializedProducerRequest = ProducerRequest.readFrom(buffer) diff --git core/src/test/scala/unit/kafka/network/SocketServerTest.scala core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 868d23f..f4dcc13 100644 --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -25,8 +25,10 @@ import kafka.utils.TestUtils import java.util.Random import junit.framework.Assert._ import kafka.producer.SyncProducerConfig -import kafka.api.{TopicData, ProducerRequest} +import kafka.api.{PartitionData, ProducerRequest} import java.nio.ByteBuffer +import kafka.common.TopicAndPartition + class SocketServerTest extends JUnitSuite { @@ -75,9 +77,10 @@ class SocketServerTest extends JUnitSuite { val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack = SyncProducerConfig.DefaultRequiredAcks - val emptyRequest = new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]()) + val emptyRequest = + new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, PartitionData]()) - val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes()) + val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes) emptyRequest.writeTo(byteBuffer) byteBuffer.rewind() val serializedBytes = new Array[Byte](byteBuffer.remaining) diff --git core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index b21712d..aba070e 100644 --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -201,11 +201,11 @@ class AsyncProducerTest extends JUnit3Suite { topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes)))) val expectedResult = Some(Map( 0 -> Map( - ("topic1", 0) -> topic1Broker1Data, - ("topic2", 0) -> topic2Broker1Data), + TopicAndPartition("topic1", 0) -> topic1Broker1Data, + TopicAndPartition("topic2", 0) -> topic2Broker1Data), 1 -> Map( - ("topic1", 1) -> topic1Broker2Data, - ("topic2", 1) -> topic2Broker2Data) + TopicAndPartition("topic1", 1) -> topic1Broker2Data, + TopicAndPartition("topic2", 1) -> topic2Broker2Data) )) val actualResult = handler.partitionAndCollate(producerDataList) @@ -344,7 +344,7 @@ class AsyncProducerTest extends JUnit3Suite { partitionedDataOpt match { case Some(partitionedData) => for ((brokerId, dataPerBroker) <- partitionedData) { - for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker) + for ( (TopicAndPartition(topic, partitionId), dataPerTopic) <- dataPerBroker) assertTrue(partitionId == 0) } case None => @@ -408,10 +408,12 @@ class AsyncProducerTest extends JUnit3Suite { // entirely. The second request will succeed for partition 1 but fail for partition 0. // On the third try for partition 0, let it succeed. val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), TestUtils.messagesToSet(msgs), 0) - val response1 = - new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L)) + val response1 = ProducerResponse(ProducerRequest.CurrentVersion, 0, + Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)), + (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) val request2 = TestUtils.produceRequest(topic1, 0, TestUtils.messagesToSet(msgs)) - val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)) + val response2 = ProducerResponse(ProducerRequest.CurrentVersion, 0, + Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1) diff --git core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index c1d9dd0..4eb36c8 100644 --- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -21,14 +21,14 @@ import java.net.SocketTimeoutException import java.util.Properties import junit.framework.Assert import kafka.admin.CreateTopicCommand -import kafka.common.{ErrorMapping, MessageSizeTooLargeException} import kafka.integration.KafkaServerTestHarness -import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, Message, ByteBufferMessageSet} +import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} import kafka.server.KafkaConfig import kafka.utils.{TestZKUtils, SystemTime, TestUtils} import org.junit.Test import org.scalatest.junit.JUnit3Suite -import kafka.api.TopicData +import kafka.api.{ProducerResponseStatus, PartitionData} +import kafka.common.{TopicAndPartition, ErrorMapping} class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { private var messageBytes = new Array[Byte](2); @@ -85,11 +85,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack = SyncProducerConfig.DefaultRequiredAcks - val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]()) + val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, PartitionData]()) val producer = new SyncProducer(new SyncProducerConfig(props)) val response = producer.send(emptyRequest) - Assert.assertTrue(response.errorCode == ErrorMapping.NoError && response.errors.size == 0 && response.offsets.size == 0) + Assert.assertTrue(!response.hasError && response.status.size == 0) } @Test @@ -109,17 +109,17 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1) val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1)) - Assert.assertEquals(1, response1.errors.length) - Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.errors(0)) - Assert.assertEquals(-1L, response1.offsets(0)) + Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError)) + Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error) + Assert.assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).nextOffset) val message2 = new Message(new Array[Byte](1000000)) val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2) val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2)) - Assert.assertEquals(1, response2.errors.length) - Assert.assertEquals(ErrorMapping.NoError, response2.errors(0)) - Assert.assertEquals(messageSet2.sizeInBytes, response2.offsets(0)) + Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError)) + Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("test", 0)).error) + Assert.assertEquals(messageSet2.sizeInBytes, response2.status(TopicAndPartition("test", 0)).nextOffset) } @Test @@ -142,10 +142,12 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertNotNull(response) Assert.assertEquals(request.correlationId, response.correlationId) - Assert.assertEquals(response.errors.length, response.offsets.length) - Assert.assertEquals(3, response.errors.length) - response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, _)) - response.offsets.foreach(Assert.assertEquals(-1L, _)) + Assert.assertEquals(3, response.status.size) + response.status.values.foreach { + case ProducerResponseStatus(error, nextOffset) => + Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, error) + Assert.assertEquals(-1L, nextOffset) + } // #2 - test that we get correct offsets when partition is owned by broker CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1) @@ -156,18 +158,18 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val response2 = producer.send(request) Assert.assertNotNull(response2) Assert.assertEquals(request.correlationId, response2.correlationId) - Assert.assertEquals(response2.errors.length, response2.offsets.length) - Assert.assertEquals(3, response2.errors.length) + Assert.assertEquals(3, response2.status.size) // the first and last message should have been accepted by broker - Assert.assertEquals(0, response2.errors(0)) - Assert.assertEquals(0, response2.errors(2)) - Assert.assertEquals(messages.sizeInBytes, response2.offsets(0)) - Assert.assertEquals(messages.sizeInBytes, response2.offsets(2)) + Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("topic1", 0)).error) + Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("topic3", 0)).error) + Assert.assertEquals(messages.sizeInBytes, response2.status(TopicAndPartition("topic1", 0)).nextOffset) + Assert.assertEquals(messages.sizeInBytes, response2.status(TopicAndPartition("topic3", 0)).nextOffset) // the middle message should have been rejected because broker doesn't lead partition - Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, response2.errors(1)) - Assert.assertEquals(-1, response2.offsets(1)) + Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, + response2.status(TopicAndPartition("topic2", 0)).error) + Assert.assertEquals(-1, response2.status(TopicAndPartition("topic2", 0)).nextOffset) } @Test diff --git core/src/test/scala/unit/kafka/utils/TestUtils.scala core/src/test/scala/unit/kafka/utils/TestUtils.scala index 5ca1db6..0e47daf 100644 --- core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -34,7 +34,9 @@ import kafka.consumer.ConsumerConfig import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.TimeUnit import kafka.api._ +import collection.mutable.Map import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} +import kafka.common.TopicAndPartition /** @@ -364,28 +366,10 @@ object TestUtils extends Logging { val correlationId = SyncProducerConfig.DefaultCorrelationId val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs - val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray)) - new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, data.toArray) - } - - def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { - produceJavaRequest(-1,topic,-1,message) - } - - def produceJavaRequest(topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { - produceJavaRequest(-1,topic,partition,message) - } - - def produceJavaRequest(correlationId: Int, topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { - val clientId = "test" - val requiredAcks: Short = 0 - val ackTimeoutMs = 0 - var data = new Array[TopicData](1) - var partitionData = new Array[PartitionData](1) - partitionData(0) = new PartitionData(partition,message.underlying) - data(0) = new TopicData(topic,partitionData) - val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data) - pr + val data = topics.flatMap(topic => + partitions.map(partition => (TopicAndPartition(topic, partition), new PartitionData(partition, message))) + ) + new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*)) } def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {