diff --git contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java index 5166358..09b1264 100644 --- contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java +++ contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java @@ -26,10 +26,10 @@ import java.util.Random; import kafka.etl.KafkaETLKey; import kafka.etl.KafkaETLRequest; import kafka.etl.Props; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.javaapi.producer.SyncProducer; +import kafka.javaapi.producer.Producer; +import kafka.javaapi.producer.ProducerData; import kafka.message.Message; -import kafka.producer.SyncProducerConfig; +import kafka.producer.ProducerConfig; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; @@ -47,7 +47,7 @@ public class DataGenerator { System.currentTimeMillis()); protected Props _props; - protected SyncProducer _producer = null; + protected Producer _producer = null; protected URI _uri = null; protected String _topic; protected int _count; @@ -70,12 +70,12 @@ public class DataGenerator { System.out.println("server uri:" + _uri.toString()); Properties producerProps = new Properties(); - producerProps.put("host", _uri.getHost()); - producerProps.put("port", String.valueOf(_uri.getPort())); + producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort())); producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE)); producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT)); producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL)); - _producer = new SyncProducer(new SyncProducerConfig(producerProps)); + + _producer = new Producer(new ProducerConfig(producerProps)); } @@ -91,7 +91,8 @@ public class DataGenerator { } // send events System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri); - _producer.send(_topic, new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, list)); + ProducerData pd = new ProducerData(_topic, null, list); + _producer.send(pd); // close the producer _producer.close(); diff --git core/src/main/scala/kafka/api/FetchRequest.scala core/src/main/scala/kafka/api/FetchRequest.scala index 7e1fa47..3cac029 100644 --- core/src/main/scala/kafka/api/FetchRequest.scala +++ core/src/main/scala/kafka/api/FetchRequest.scala @@ -18,60 +18,13 @@ package kafka.api import java.nio.ByteBuffer -import kafka.utils.Utils -import scala.collection.mutable.{HashMap, Buffer, ListBuffer} -import kafka.common.{KafkaException, FetchRequestFormatException} +import kafka.utils.{nonthreadsafe, Utils} +import scala.collection.immutable.Map +import kafka.common.TopicPartition -object OffsetDetail { - def readFrom(buffer: ByteBuffer): OffsetDetail = { - val topic = Utils.readShortString(buffer, "UTF-8") +case class PartitionFetchInfo(offset: Long, fetchSize: Int) - val partitionsCount = buffer.getInt - val partitions = new Array[Int](partitionsCount) - for (i <- 0 until partitions.length) - partitions(i) = buffer.getInt - - val offsetsCount = buffer.getInt - val offsets = new Array[Long](offsetsCount) - for (i <- 0 until offsets.length) - offsets(i) = buffer.getLong - - val fetchesCount = buffer.getInt - val fetchSizes = new Array[Int](fetchesCount) - for (i <- 0 until fetchSizes.length) - fetchSizes(i) = buffer.getInt - - new OffsetDetail(topic, partitions, offsets, fetchSizes) - } - -} - -case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long], fetchSizes: Seq[Int]) { - - def writeTo(buffer: ByteBuffer) { - Utils.writeShortString(buffer, topic, "UTF-8") - - if(partitions.size > Int.MaxValue || offsets.size > Int.MaxValue || fetchSizes.size > Int.MaxValue) - throw new KafkaException("Number of fetches in FetchRequest exceeds " + Int.MaxValue + ".") - - buffer.putInt(partitions.length) - partitions.foreach(buffer.putInt(_)) - - buffer.putInt(offsets.length) - offsets.foreach(buffer.putLong(_)) - - buffer.putInt(fetchSizes.length) - fetchSizes.foreach(buffer.putInt(_)) - } - - def sizeInBytes(): Int = { - 2 + topic.length() + // topic string - partitions.foldLeft(4)((s, _) => s + 4) + // each request partition (int) - offsets.foldLeft(4)((s, _) => s + 8) + // each request offset (long) - fetchSizes.foldLeft(4)((s,_) => s + 4) // each request fetch size - } -} object FetchRequest { val CurrentVersion = 1.shortValue() @@ -85,18 +38,23 @@ object FetchRequest { def readFrom(buffer: ByteBuffer): FetchRequest = { val versionId = buffer.getShort val correlationId = buffer.getInt - val clientId = Utils.readShortString(buffer, "UTF-8") + val clientId = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) val replicaId = buffer.getInt val maxWait = buffer.getInt val minBytes = buffer.getInt - val offsetsCount = buffer.getInt - val offsetInfo = new Array[OffsetDetail](offsetsCount) - for(i <- 0 until offsetInfo.length) - offsetInfo(i) = OffsetDetail.readFrom(buffer) - - new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetInfo) + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partitionId = buffer.getInt + val offset = buffer.getLong + val fetchSize = buffer.getInt + (TopicPartition(topic, partitionId), PartitionFetchInfo(offset, fetchSize)) + }) + }) + FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, Map(pairs:_*)) } - } case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, @@ -105,48 +63,61 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, replicaId: Int = FetchRequest.DefaultReplicaId, maxWait: Int = FetchRequest.DefaultMaxWait, minBytes: Int = FetchRequest.DefaultMinBytes, - offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.Fetch)) { - - // ensure that a topic "X" appears in at most one OffsetDetail - def validate() { - if(offsetInfo == null) - throw new FetchRequestFormatException("FetchRequest has null offsetInfo") - - // We don't want to get fancy with groupBy's and filter's since we just want the first occurrence - var topics = Set[String]() - val iter = offsetInfo.iterator - while(iter.hasNext) { - val offsetData = iter.next() - val topic = offsetData.topic - if(topics.contains(topic)) - throw new FetchRequestFormatException("FetchRequest has multiple OffsetDetails for topic: " + topic) - else - topics += topic - } - } + requestInfo: Map[TopicPartition, PartitionFetchInfo]) + extends RequestOrResponse(Some(RequestKeys.Fetch)) { - def writeTo(buffer: ByteBuffer) { - // validate first - validate() + /** + * Partitions the request info into a map of maps (one for each topic). + */ + lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) + def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) - Utils.writeShortString(buffer, clientId, "UTF-8") + Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset) buffer.putInt(replicaId) buffer.putInt(maxWait) buffer.putInt(minBytes) - buffer.putInt(offsetInfo.size) - for(topicDetail <- offsetInfo) { - topicDetail.writeTo(buffer) + buffer.putInt(requestInfoGroupedByTopic.size) // topic count + requestInfoGroupedByTopic.foreach { + case (topic, partitionFetchInfos) => + Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) + buffer.putInt(partitionFetchInfos.size) // partition count + partitionFetchInfos.foreach { + case (TopicPartition(_, partition), PartitionFetchInfo(offset, fetchSize)) => + buffer.putInt(partition) + buffer.putLong(offset) + buffer.putInt(fetchSize) + } } } - def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes()) + def sizeInBytes: Int = { + 2 + /* versionId */ + 4 + /* correlationId */ + Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + + 4 + /* replicaId */ + 4 + /* maxWait */ + 4 + /* minBytes */ + 4 + /* topic count */ + requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { + val (topic, partitionFetchInfos) = currTopic + foldedTopics + + Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) + + 4 + /* partition count */ + partitionFetchInfos.size * ( + 4 + /* partition id */ + 8 + /* offset */ + 4 /* fetch size */ + ) + }) + } - def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size) + def numPartitions = requestInfo.size } +@nonthreadsafe class FetchRequestBuilder() { private var correlationId = FetchRequest.DefaultCorrelationId private val versionId = FetchRequest.CurrentVersion @@ -154,13 +125,10 @@ class FetchRequestBuilder() { private var replicaId = FetchRequest.DefaultReplicaId private var maxWait = FetchRequest.DefaultMaxWait private var minBytes = FetchRequest.DefaultMinBytes - private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]] + private val requestMap = new collection.mutable.HashMap[TopicPartition, PartitionFetchInfo] def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = { - val topicData = requestMap.getOrElseUpdate(topic, (ListBuffer[Int](), ListBuffer[Long](), ListBuffer[Int]())) - topicData._1.append(partition) - topicData._2.append(offset) - topicData._3.append(fetchSize) + requestMap.put(TopicPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) this } @@ -189,10 +157,5 @@ class FetchRequestBuilder() { this } - def build() = { - val offsetDetails = requestMap.map{ topicData => - new OffsetDetail(topicData._1, topicData._2._1.toArray, topicData._2._2.toArray, topicData._2._3.toArray) - } - new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetDetails.toArray[OffsetDetail]) - } + def build() = FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, requestMap.toMap) } diff --git core/src/main/scala/kafka/api/FetchResponse.scala core/src/main/scala/kafka/api/FetchResponse.scala index 1d581d4..732cf5d 100644 --- core/src/main/scala/kafka/api/FetchResponse.scala +++ core/src/main/scala/kafka/api/FetchResponse.scala @@ -19,27 +19,35 @@ package kafka.api import java.nio.ByteBuffer import java.nio.channels.GatheringByteChannel -import kafka.common.ErrorMapping +import kafka.common.{TopicPartition, ErrorMapping} import kafka.message.{MessageSet, ByteBufferMessageSet} import kafka.network.{MultiSend, Send} import kafka.utils.Utils object PartitionData { def readFrom(buffer: ByteBuffer): PartitionData = { - val error = buffer.getShort val partition = buffer.getInt + val error = buffer.getShort val initialOffset = buffer.getLong - val hw = buffer.getLong() + val hw = buffer.getLong val messageSetSize = buffer.getInt val messageSetBuffer = buffer.slice() messageSetBuffer.limit(messageSetSize) buffer.position(buffer.position + messageSetSize) new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error)) } + + val headerSize = + 4 + /* partition */ + 2 + /* error code */ + 8 + /* initialOffset */ + 8 + /* high watermark */ + 4 /* messageSetSize */ } case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) { - val sizeInBytes = 4 + 2 + 8 + 4 + messages.sizeInBytes.intValue() + 8 + + val sizeInBytes = PartitionData.headerSize + messages.sizeInBytes.intValue() def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages) } @@ -50,17 +58,17 @@ class PartitionDataSend(val partitionData: PartitionData) extends Send { private val messageSize = partitionData.messages.sizeInBytes private var messagesSentSize = 0L - private val buffer = ByteBuffer.allocate(26) - buffer.putShort(partitionData.error) + private val buffer = ByteBuffer.allocate(PartitionData.headerSize) buffer.putInt(partitionData.partition) + buffer.putShort(partitionData.error) buffer.putLong(partitionData.initialOffset) buffer.putLong(partitionData.hw) buffer.putInt(partitionData.messages.sizeInBytes.intValue()) buffer.rewind() - def complete = !buffer.hasRemaining && messagesSentSize >= messageSize + override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize - def writeTo(channel: GatheringByteChannel): Int = { + override def writeTo(channel: GatheringByteChannel): Int = { var written = 0 if(buffer.hasRemaining) written += channel.write(buffer) @@ -75,63 +83,43 @@ class PartitionDataSend(val partitionData: PartitionData) extends Send { object TopicData { def readFrom(buffer: ByteBuffer): TopicData = { - val topic = Utils.readShortString(buffer, "UTF-8") + val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) val partitionCount = buffer.getInt - val partitions = new Array[PartitionData](partitionCount) - for(i <- 0 until partitionCount) - partitions(i) = PartitionData.readFrom(buffer) - new TopicData(topic, partitions.sortBy(_.partition)) + val topicPartitionDataPairs = (1 to partitionCount).map(_ => { + val partitionData = PartitionData.readFrom(buffer) + (TopicPartition(topic, partitionData.partition), partitionData) + }) + TopicData(topic, Map(topicPartitionDataPairs:_*)) } - def findPartition(data: Array[PartitionData], partition: Int): Option[PartitionData] = { - if(data == null || data.size == 0) - return None - - var (low, high) = (0, data.size-1) - while(low <= high) { - val mid = (low + high) / 2 - val found = data(mid) - if(found.partition == partition) - return Some(found) - else if(partition < found.partition) - high = mid - 1 - else - low = mid + 1 - } - None - } + def headerSize(topic: String) = + Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) + + 4 /* partition count */ } -case class TopicData(topic: String, partitionDataArray: Array[PartitionData]) { - val sizeInBytes = 2 + topic.length + partitionDataArray.foldLeft(4)(_ + _.sizeInBytes) +case class TopicData(topic: String, partitionData: Map[TopicPartition, PartitionData]) { + val sizeInBytes = + TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + _.sizeInBytes) - // need to override equals due to brokern java-arrays equals functionality - override def equals(other: Any): Boolean = { - other match { - case that: TopicData => - ( topic == that.topic && - partitionDataArray.toSeq == that.partitionDataArray.toSeq ) - case _ => false - } - } + val headerSize = TopicData.headerSize(topic) } class TopicDataSend(val topicData: TopicData) extends Send { - val size = topicData.sizeInBytes + private val size = topicData.sizeInBytes - var sent = 0 + private var sent = 0 - private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4) - Utils.writeShortString(buffer, topicData.topic, "UTF-8") - buffer.putInt(topicData.partitionDataArray.length) + override def complete = sent >= size + + private val buffer = ByteBuffer.allocate(topicData.headerSize) + Utils.writeShortString(buffer, topicData.topic, RequestOrResponse.DefaultCharset) + buffer.putInt(topicData.partitionData.size) buffer.rewind() - val sends = new MultiSend(topicData.partitionDataArray.map(new PartitionDataSend(_)).toList) { - val expectedBytesToWrite = topicData.partitionDataArray.foldLeft(0)(_ + _.sizeInBytes) + val sends = new MultiSend(topicData.partitionData.toList.map(d => new PartitionDataSend(d._2))) { + val expectedBytesToWrite = topicData.sizeInBytes - topicData.headerSize } - def complete = sent >= size - def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() var written = 0 @@ -146,46 +134,59 @@ class TopicDataSend(val topicData: TopicData) extends Send { } +object FetchResponse { + val headerSize = + 2 + /* versionId */ + 4 + /* correlationId */ + 4 /* topic count */ -object FetchResponse { def readFrom(buffer: ByteBuffer): FetchResponse = { val versionId = buffer.getShort - val errorCode = buffer.getShort val correlationId = buffer.getInt - val dataCount = buffer.getInt - val data = new Array[TopicData](dataCount) - for(i <- 0 until data.length) - data(i) = TopicData.readFrom(buffer) - new FetchResponse(versionId, correlationId, data, errorCode) + val topicCount = buffer.getInt + val pairs = (1 to topicCount).flatMap(_ => { + val topicData = TopicData.readFrom(buffer) + topicData.partitionData.values.map( + partitionData => (TopicPartition(topicData.topic, partitionData.partition), partitionData) + ) + }) + FetchResponse(versionId, correlationId, Map(pairs:_*)) } } case class FetchResponse(versionId: Short, correlationId: Int, - data: Array[TopicData], - errorCode: Short = ErrorMapping.NoError) { - - val sizeInBytes = 2 + 4 + 2 + data.foldLeft(4)(_ + _.sizeInBytes) - - lazy val topicMap = data.groupBy(_.topic).mapValues(_.head) - - def messageSet(topic: String, partition: Int): ByteBufferMessageSet = { - val messageSet = topicMap.get(topic) match { - case Some(topicData) => - TopicData.findPartition(topicData.partitionDataArray, partition).map(_.messages).getOrElse(MessageSet.Empty) - case None => - MessageSet.Empty - } - messageSet.asInstanceOf[ByteBufferMessageSet] - } - - def highWatermark(topic: String, partition: Int): Long = { - topicMap.get(topic) match { - case Some(topicData) => - TopicData.findPartition(topicData.partitionDataArray, partition).map(_.hw).getOrElse(-1L) - case None => -1L + data: Map[TopicPartition, PartitionData]) { + + /** + * Partitions the data into a map of maps (one for each topic). + */ + lazy val dataGroupedByTopic = data.groupBy(_._1.topic) + + val sizeInBytes = + FetchResponse.headerSize + + dataGroupedByTopic.foldLeft(0) ((folded, curr) => { + val topicData = TopicData(curr._1, curr._2) + folded + + topicData.sizeInBytes + }) + + def messageSet(topic: String, partition: Int): ByteBufferMessageSet = data.get(TopicPartition(topic, partition)) + .map(_.messages).getOrElse(MessageSet.Empty).asInstanceOf[ByteBufferMessageSet] + + def highWatermark(topic: String, partition: Int) = + data.get(TopicPartition(topic, partition)).map(_.hw).getOrElse(-1L) + + def hasError = data.values.exists(_.error != ErrorMapping.NoError) + + def errorCode(topic: String, partition: Int) = { + val topicPartition = TopicPartition(topic, partition) + data.get(topicPartition) match { + case Some(partitionData) => partitionData.error + case _ => + throw new IllegalArgumentException("No partition %s in fetch response %s".format(topicPartition, this.toString)) } } } @@ -193,21 +194,25 @@ case class FetchResponse(versionId: Short, class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { private val size = fetchResponse.sizeInBytes + private var sent = 0 - - private val buffer = ByteBuffer.allocate(16) + + private val sendSize = 4 /* for size */ + size + + override def complete = sent >= sendSize + + private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize) buffer.putInt(size) buffer.putShort(fetchResponse.versionId) - buffer.putShort(fetchResponse.errorCode) buffer.putInt(fetchResponse.correlationId) - buffer.putInt(fetchResponse.data.length) + buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count buffer.rewind() - - val sends = new MultiSend(fetchResponse.data.map(new TopicDataSend(_)).toList) { - val expectedBytesToWrite = fetchResponse.data.foldLeft(0)(_ + _.sizeInBytes) - } - def complete = sent >= sendSize + val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map { + case(topic, data) => new TopicDataSend(TopicData(topic, data)) + }) { + val expectedBytesToWrite = fetchResponse.sizeInBytes - FetchResponse.headerSize + } def writeTo(channel: GatheringByteChannel):Int = { expectIncomplete() @@ -220,6 +225,5 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { sent += written written } - - def sendSize = 4 + fetchResponse.sizeInBytes } + diff --git core/src/main/scala/kafka/api/ProducerRequest.scala core/src/main/scala/kafka/api/ProducerRequest.scala index d7767b5..526edfd 100644 --- core/src/main/scala/kafka/api/ProducerRequest.scala +++ core/src/main/scala/kafka/api/ProducerRequest.scala @@ -20,6 +20,8 @@ package kafka.api import java.nio._ import kafka.message._ import kafka.utils._ +import scala.collection.Map +import kafka.common.TopicPartition object ProducerRequest { @@ -28,88 +30,93 @@ object ProducerRequest { def readFrom(buffer: ByteBuffer): ProducerRequest = { val versionId: Short = buffer.getShort val correlationId: Int = buffer.getInt - val clientId: String = Utils.readShortString(buffer, "UTF-8") + val clientId: String = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) val requiredAcks: Short = buffer.getShort val ackTimeoutMs: Int = buffer.getInt //build the topic structure val topicCount = buffer.getInt - val data = new Array[TopicData](topicCount) - for(i <- 0 until topicCount) { - val topic = Utils.readShortString(buffer, "UTF-8") - + val partitionDataPairs = (1 to topicCount).flatMap(_ => { + // process topic + val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) val partitionCount = buffer.getInt - //build the partition structure within this topic - val partitionData = new Array[PartitionData](partitionCount) - for (j <- 0 until partitionCount) { + (1 to partitionCount).map(_ => { val partition = buffer.getInt val messageSetSize = buffer.getInt val messageSetBuffer = new Array[Byte](messageSetSize) buffer.get(messageSetBuffer,0,messageSetSize) - partitionData(j) = new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) - } - data(i) = new TopicData(topic,partitionData) - } - new ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, data) + (TopicPartition(topic, partition), + new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))) + }) + }) + + ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, Map(partitionDataPairs:_*)) } } -case class ProducerRequest( versionId: Short, +case class ProducerRequest( versionId: Short = ProducerRequest.CurrentVersion, correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, - data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) { + data: Map[TopicPartition, PartitionData]) + extends RequestOrResponse(Some(RequestKeys.Produce)) { - def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) = + /** + * Partitions the data into a map of maps (one for each topic). + */ + private lazy val dataGroupedByTopic = data.groupBy(_._1.topic) + + def this(correlationId: Int, + clientId: String, + requiredAcks: Short, + ackTimeoutMs: Int, + data: Map[TopicPartition, PartitionData]) = this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data) def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) - Utils.writeShortString(buffer, clientId, "UTF-8") + Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset) buffer.putShort(requiredAcks) buffer.putInt(ackTimeoutMs) + //save the topic structure - buffer.putInt(data.size) //the number of topics - for(topicData <- data) { - Utils.writeShortString(buffer, topicData.topic, "UTF-8") //write the topic - buffer.putInt(topicData.partitionDataArray.size) //the number of partitions - for(partitionData <- topicData.partitionDataArray) { - buffer.putInt(partitionData.partition) - buffer.putInt(partitionData.messages.getSerialized().limit) - buffer.put(partitionData.messages.getSerialized()) - partitionData.messages.getSerialized().rewind - } + buffer.putInt(dataGroupedByTopic.size) //the number of topics + dataGroupedByTopic.foreach { + case (topic, topicAndPartitionData) => + Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) //write the topic + buffer.putInt(topicAndPartitionData.size) //the number of partitions + topicAndPartitionData.foreach(partitionAndData => { + val partitionData = partitionAndData._2 + buffer.putInt(partitionData.partition) + buffer.putInt(partitionData.messages.getSerialized().limit) + buffer.put(partitionData.messages.getSerialized()) + partitionData.messages.getSerialized().rewind + }) } } - def sizeInBytes(): Int = { - var size = 0 - //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size - size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4 - for(topicData <- data) { - size += 2 + topicData.topic.length + 4 - for(partitionData <- topicData.partitionDataArray) { - size += 4 + 4 + partitionData.messages.sizeInBytes.asInstanceOf[Int] + def sizeInBytes: Int = { + 2 + /* versionId */ + 4 + /* correlationId */ + Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + /* client id */ + 2 + /* requiredAcks */ + 4 + /* ackTimeoutMs */ + 4 + /* number of topics */ + dataGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => { + foldedTopics + + Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) + + 4 + /* the number of partitions */ + { + currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => { + foldedPartitions + + 4 + /* partition id */ + 4 + /* byte-length of serialized messages */ + currPartition._2.messages.sizeInBytes.toInt + }) } - } - size + }) } - // need to override case-class equals due to broken java-array equals() - override def equals(other: Any): Boolean = { - other match { - case that: ProducerRequest => - ( correlationId == that.correlationId && - clientId == that.clientId && - requiredAcks == that.requiredAcks && - ackTimeoutMs == that.ackTimeoutMs && - data.toSeq == that.data.toSeq ) - case _ => false - } - } - - def topicPartitionCount = data.foldLeft(0)(_ + _.partitionDataArray.length) - } diff --git core/src/main/scala/kafka/api/ProducerResponse.scala core/src/main/scala/kafka/api/ProducerResponse.scala index dc110e0..08df36c 100644 --- core/src/main/scala/kafka/api/ProducerResponse.scala +++ core/src/main/scala/kafka/api/ProducerResponse.scala @@ -18,57 +18,81 @@ package kafka.api import java.nio.ByteBuffer -import kafka.common.ErrorMapping +import kafka.utils.Utils +import scala.collection.Map +import kafka.common.{TopicPartition, ErrorMapping} object ProducerResponse { def readFrom(buffer: ByteBuffer): ProducerResponse = { val versionId = buffer.getShort val correlationId = buffer.getInt - val errorCode = buffer.getShort - val errorsSize = buffer.getInt - val errors = new Array[Short](errorsSize) - for( i <- 0 until errorsSize) { - errors(i) = buffer.getShort - } - val offsetsSize = buffer.getInt - val offsets = new Array[Long](offsetsSize) - for( i <- 0 until offsetsSize) { - offsets(i) = buffer.getLong - } - new ProducerResponse(versionId, correlationId, errors, offsets, errorCode) + val topicCount = buffer.getInt + val statusPairs = (1 to topicCount).flatMap(_ => { + val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partition = buffer.getInt + val error = buffer.getShort + val offset = buffer.getLong + (TopicPartition(topic, partition), ProducerResponseStatus(error, offset)) + }) + }) + + ProducerResponse(versionId, correlationId, Map(statusPairs:_*)) } } -case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short], - offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ - val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length) +case class ProducerResponseStatus(error: Short, nextOffset: Long) + + +case class ProducerResponse(versionId: Short, + correlationId: Int, + status: Map[TopicPartition, ProducerResponseStatus]) extends RequestOrResponse { + + /** + * Partitions the status map into a map of maps (one for each topic). + */ + private lazy val statusGroupedByTopic = status.groupBy(_._1.topic) + + def hasError = status.values.exists(_.error != ErrorMapping.NoError) + + val sizeInBytes = { + val groupedStatus = statusGroupedByTopic + 2 + /* version id */ + 4 + /* correlation id */ + 4 + /* topic count */ + groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => { + foldedTopics + + Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) + + 4 + /* partition count for this topic */ + currTopic._2.foldLeft (0) ((foldedPartitions, currPartition) => { + foldedPartitions + + 4 + /* partition id */ + 2 + /* error code */ + 8 /* offset */ + }) + }) + } def writeTo(buffer: ByteBuffer) { - /* version id */ + val groupedStatus = statusGroupedByTopic + buffer.putShort(versionId) - /* correlation id */ buffer.putInt(correlationId) - /* error code */ - buffer.putShort(errorCode) - /* errors */ - buffer.putInt(errors.length) - errors.foreach(buffer.putShort(_)) - /* offsets */ - buffer.putInt(offsets.length) - offsets.foreach(buffer.putLong(_)) - } + buffer.putInt(groupedStatus.size) // topic count - // need to override case-class equals due to broken java-array equals() - override def equals(other: Any): Boolean = { - other match { - case that: ProducerResponse => - ( correlationId == that.correlationId && - versionId == that.versionId && - errorCode == that.errorCode && - errors.toSeq == that.errors.toSeq && - offsets.toSeq == that.offsets.toSeq) - case _ => false - } + groupedStatus.foreach(topicStatus => { + val (topic, errorsAndOffsets) = topicStatus + Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) + buffer.putInt(errorsAndOffsets.size) // partition count + errorsAndOffsets.foreach { + case((TopicPartition(_, partition), ProducerResponseStatus(error, nextOffset))) => + buffer.putInt(partition) + buffer.putShort(error) + buffer.putLong(nextOffset) + } + }) } -} \ No newline at end of file +} + diff --git core/src/main/scala/kafka/api/RequestOrResponse.scala core/src/main/scala/kafka/api/RequestOrResponse.scala index ac5b64e..611bb42 100644 --- core/src/main/scala/kafka/api/RequestOrResponse.scala +++ core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -19,6 +19,12 @@ package kafka.api import java.nio._ + +object RequestOrResponse { + val DefaultCharset = "UTF-8" +} + + private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) { def sizeInBytes: Int diff --git core/src/main/scala/kafka/common/ErrorMapping.scala core/src/main/scala/kafka/common/ErrorMapping.scala index 54312c0..d3f69bb 100644 --- core/src/main/scala/kafka/common/ErrorMapping.scala +++ core/src/main/scala/kafka/common/ErrorMapping.scala @@ -34,12 +34,11 @@ object ErrorMapping { val InvalidMessageCode : Short = 2 val UnknownTopicOrPartitionCode : Short = 3 val InvalidFetchSizeCode : Short = 4 - val InvalidFetchRequestFormatCode : Short = 5 - val LeaderNotAvailableCode : Short = 6 - val NotLeaderForPartitionCode : Short = 7 - val RequestTimedOutCode: Short = 8 - val BrokerNotExistInZookeeperCode: Short = 9 - val ReplicaNotAvailableCode: Short = 10 + val LeaderNotAvailableCode : Short = 5 + val NotLeaderForPartitionCode : Short = 6 + val RequestTimedOutCode: Short = 7 + val BrokerNotExistInZookeeperCode: Short = 8 + val ReplicaNotAvailableCode: Short = 9 private val exceptionToCode = Map[Class[Throwable], Short]( @@ -47,7 +46,6 @@ object ErrorMapping { classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode, classOf[UnknownTopicOrPartitionException].asInstanceOf[Class[Throwable]] -> UnknownTopicOrPartitionCode, classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode, - classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode, classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode, classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode, classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode, diff --git core/src/main/scala/kafka/common/FetchRequestFormatException.scala core/src/main/scala/kafka/common/FetchRequestFormatException.scala deleted file mode 100644 index 0bc7d4e..0000000 --- core/src/main/scala/kafka/common/FetchRequestFormatException.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.common - -class FetchRequestFormatException(val message: String) extends RuntimeException(message) { - def this() = this(null) -} diff --git core/src/main/scala/kafka/common/TopicPartition.scala core/src/main/scala/kafka/common/TopicPartition.scala new file mode 100644 index 0000000..717f49d --- /dev/null +++ core/src/main/scala/kafka/common/TopicPartition.scala @@ -0,0 +1,29 @@ +package kafka.common + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Convenience case class since (topic, partition) pairs are ubiquitous. + */ +case class TopicPartition(topic: String, partitionId: Int) { + + def this(tuple: (String, Int)) = this(tuple._1, tuple._2) + + def asTuple = (topic, partitionId) +} + diff --git core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index a94906d..fabce25 100644 --- core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import java.util.concurrent.locks.ReentrantLock import kafka.utils.ZkUtils._ import kafka.utils.{ShutdownableThread, SystemTime} +import kafka.common.TopicPartition /** @@ -38,7 +39,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) { private var partitionMap: immutable.Map[(String, Int), PartitionTopicInfo] = null private var cluster: Cluster = null - private val noLeaderPartitionSet = new mutable.HashSet[(String, Int)] + private val noLeaderPartitionSet = new mutable.HashSet[TopicPartition] private val lock = new ReentrantLock private val cond = lock.newCondition() private val leaderFinderThread = new ShutdownableThread(consumerIdString + "-leader-finder-thread"){ @@ -48,21 +49,22 @@ class ConsumerFetcherManager(private val consumerIdString: String, try { if (noLeaderPartitionSet.isEmpty) cond.await() - for ((topic, partitionId) <- noLeaderPartitionSet) { - // find the leader for this partition - getLeaderForPartition(zkClient, topic, partitionId) match { - case Some(leaderId) => - cluster.getBroker(leaderId) match { - case Some(broker) => - val pti = partitionMap((topic, partitionId)) - addFetcher(topic, partitionId, pti.getFetchOffset(), broker) - noLeaderPartitionSet.remove((topic, partitionId)) - case None => - error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started" - .format(leaderId, topic, partitionId)) - } - case None => // let it go since we will keep retrying - } + noLeaderPartitionSet.foreach { + case(TopicPartition(topic, partitionId)) => + // find the leader for this partition + getLeaderForPartition(zkClient, topic, partitionId) match { + case Some(leaderId) => + cluster.getBroker(leaderId) match { + case Some(broker) => + val pti = partitionMap((topic, partitionId)) + addFetcher(topic, partitionId, pti.getFetchOffset(), broker) + noLeaderPartitionSet.remove(TopicPartition(topic, partitionId)) + case None => + error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started" + .format(leaderId, topic, partitionId)) + } + case None => // let it go since we will keep retrying + } } } finally { lock.unlock() @@ -84,7 +86,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, try { partitionMap = topicInfos.map(tpi => ((tpi.topic, tpi.partitionId), tpi)).toMap this.cluster = cluster - noLeaderPartitionSet ++= topicInfos.map(tpi => (tpi.topic, tpi.partitionId)) + noLeaderPartitionSet ++= topicInfos.map(tpi => TopicPartition(tpi.topic, tpi.partitionId)) cond.signalAll() } finally { lock.unlock() @@ -117,7 +119,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, pti } - def addPartitionsWithError(partitionList: Iterable[(String, Int)]) { + def addPartitionsWithError(partitionList: Iterable[TopicPartition]) { debug("adding partitions with error %s".format(partitionList)) lock.lock() try { diff --git core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index f13f449..2026aab 100644 --- core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -21,6 +21,8 @@ import kafka.cluster.Broker import kafka.server.AbstractFetcherThread import kafka.message.ByteBufferMessageSet import kafka.api.{FetchRequest, OffsetRequest, PartitionData} +import kafka.common.TopicPartition + class ConsumerFetcherThread(name: String, val config: ConsumerConfig, @@ -57,7 +59,7 @@ class ConsumerFetcherThread(name: String, } // any logic for partitions whose leader has changed - def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) { + def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) { consumerFetcherManager.addPartitionsWithError(partitions) } } diff --git core/src/main/scala/kafka/javaapi/FetchRequest.scala core/src/main/scala/kafka/javaapi/FetchRequest.scala new file mode 100644 index 0000000..b9e9f38 --- /dev/null +++ core/src/main/scala/kafka/javaapi/FetchRequest.scala @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.javaapi + +import scala.collection.JavaConversions +import kafka.api.PartitionFetchInfo +import java.nio.ByteBuffer +import kafka.common.TopicPartition + + +class FetchRequest(correlationId: Int, + clientId: String, + replicaId: Int, + maxWait: Int, + minBytes: Int, + requestInfo: java.util.Map[TopicPartition, PartitionFetchInfo]) { + + val underlying = { + val scalaMap = JavaConversions.asMap(requestInfo).toMap + kafka.api.FetchRequest( + correlationId = correlationId, + clientId = clientId, + replicaId = replicaId, + maxWait = maxWait, + minBytes = minBytes, + requestInfo = scalaMap + ) + } + + def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } + + def sizeInBytes = underlying.sizeInBytes + + override def toString = underlying.toString + + override def equals(other: Any) = canEqual(other) && { + val otherFetchRequest = other.asInstanceOf[kafka.javaapi.FetchRequest] + this.underlying.equals(otherFetchRequest.underlying) + } + + def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchRequest] + + override def hashCode = underlying.hashCode + +} + diff --git core/src/main/scala/kafka/javaapi/FetchResponse.scala core/src/main/scala/kafka/javaapi/FetchResponse.scala index 3d0928a..57e5a66 100644 --- core/src/main/scala/kafka/javaapi/FetchResponse.scala +++ core/src/main/scala/kafka/javaapi/FetchResponse.scala @@ -17,17 +17,33 @@ package kafka.javaapi -import kafka.api.TopicData +import kafka.api.PartitionData +import kafka.common.TopicPartition class FetchResponse( val versionId: Short, val correlationId: Int, - private val data: Array[TopicData] ) { + private val data: Map[TopicPartition, PartitionData] ) { - private val underlying = new kafka.api.FetchResponse(versionId, correlationId, data) + private val underlying = kafka.api.FetchResponse(versionId, correlationId, data) def messageSet(topic: String, partition: Int): kafka.javaapi.message.ByteBufferMessageSet = { import Implicits._ underlying.messageSet(topic, partition) } + + def highWatermark(topic: String, partition: Int) = underlying.highWatermark(topic, partition) + + def hasError = underlying.hasError + + def errorCode(topic: String, partition: Int) = underlying.errorCode(topic, partition) + + override def equals(other: Any) = canEqual(other) && { + val otherFetchResponse = other.asInstanceOf[kafka.javaapi.FetchResponse] + this.underlying.equals(otherFetchResponse.underlying) + } + + def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchResponse] + + override def hashCode = underlying.hashCode } diff --git core/src/main/scala/kafka/javaapi/ProducerRequest.scala core/src/main/scala/kafka/javaapi/ProducerRequest.scala index 77e07d7..790acb6 100644 --- core/src/main/scala/kafka/javaapi/ProducerRequest.scala +++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala @@ -16,29 +16,42 @@ */ package kafka.javaapi -import kafka.api.RequestOrResponse -import kafka.api.{RequestKeys, TopicData} import java.nio.ByteBuffer +import kafka.api.{PartitionData, RequestOrResponse, RequestKeys} +import scala.collection.JavaConversions +import kafka.common.TopicPartition + class ProducerRequest(val correlationId: Int, val clientId: String, val requiredAcks: Short, val ackTimeoutMs: Int, - val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) { + val data: java.util.Map[TopicPartition, PartitionData]) + extends RequestOrResponse(Some(RequestKeys.Produce)) { - val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data) + private val underlying = { + val scalaMap = JavaConversions.asMap(data).toMap + kafka.api.ProducerRequest( + correlationId = correlationId, + clientId = clientId, + requiredAcks = requiredAcks, + ackTimeoutMs = ackTimeoutMs, + data = scalaMap) + } def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } - def sizeInBytes(): Int = underlying.sizeInBytes + def sizeInBytes = underlying.sizeInBytes - override def toString: String = - underlying.toString + override def toString = underlying.toString - override def equals(other: Any): Boolean = underlying.equals(other) + override def equals(other: Any) = canEqual(other) && { + val otherProducerRequest = other.asInstanceOf[ProducerRequest] + underlying.equals(otherProducerRequest.underlying) + } - def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] + def canEqual(other: Any) = other.isInstanceOf[ProducerRequest] - override def hashCode: Int = underlying.hashCode + override def hashCode = underlying.hashCode } \ No newline at end of file diff --git core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala index c1b9fb9..40fdbc6 100644 --- core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala +++ core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala @@ -17,9 +17,8 @@ package kafka.javaapi.consumer -import kafka.api.FetchRequest -import kafka.javaapi.FetchResponse import kafka.utils.threadsafe +import kafka.javaapi.FetchResponse /** * A consumer of kafka messages @@ -37,10 +36,20 @@ class SimpleConsumer(val host: String, * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. * @return a set of fetched messages */ - def fetch(request: FetchRequest): FetchResponse = { + def fetch(request: kafka.api.FetchRequest): FetchResponse = { import kafka.javaapi.Implicits._ underlying.fetch(request) } + + /** + * Fetch a set of messages from a topic. + * + * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. + * @return a set of fetched messages + */ + def fetch(request: kafka.javaapi.FetchRequest): FetchResponse = { + fetch(request.underlying) + } /** * Get a list of valid offsets (up to maxSize) before the given time. diff --git core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala deleted file mode 100644 index c1ff168..0000000 --- core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.javaapi.producer - -import kafka.producer.SyncProducerConfig -import kafka.javaapi.message.ByteBufferMessageSet -import kafka.api.{ProducerResponse, PartitionData, TopicData} - -class SyncProducer(syncProducer: kafka.producer.SyncProducer) { - - def this(config: SyncProducerConfig) = this(new kafka.producer.SyncProducer(config)) - - val underlying = syncProducer - - def send(producerRequest: kafka.javaapi.ProducerRequest): ProducerResponse = { - underlying.send(producerRequest.underlying) - } - - def send(topic: String, messages: ByteBufferMessageSet): ProducerResponse = { - val partitionData = Array[PartitionData]( new PartitionData(-1, messages.underlying) ) - val data = Array[TopicData]( new TopicData(topic, partitionData) ) - val producerRequest = new kafka.api.ProducerRequest(-1, "", 0, 0, data) - underlying.send(producerRequest) - } - - def close() { - underlying.close - } -} diff --git core/src/main/scala/kafka/producer/SyncProducer.scala core/src/main/scala/kafka/producer/SyncProducer.scala index 064b7f4..7083fdd 100644 --- core/src/main/scala/kafka/producer/SyncProducer.scala +++ core/src/main/scala/kafka/producer/SyncProducer.scala @@ -101,13 +101,12 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { * Send a message */ def send(producerRequest: ProducerRequest): ProducerResponse = { - for( topicData <- producerRequest.data ) { - for( partitionData <- topicData.partitionDataArray ) { - verifyMessageSize(partitionData.messages) - val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int] - trace("Got message set with " + setSize + " bytes to send") - } - } + producerRequest.data.foreach( topicAndPartitionData => { + val partitionData = topicAndPartitionData._2 + verifyMessageSize(partitionData.messages) + val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int] + trace("Got message set with " + setSize + " bytes to send") + }) val response = doSend(producerRequest) ProducerResponse.readFrom(response.buffer) } diff --git core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index d2e9529..abc7051 100644 --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -18,13 +18,13 @@ package kafka.producer.async import kafka.common._ -import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet} +import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} import kafka.producer._ import kafka.serializer.Encoder import kafka.utils.{Utils, Logging} -import scala.collection.Map +import scala.collection.{Seq, Map} import scala.collection.mutable.{ListBuffer, HashMap} -import kafka.api.{TopicMetadata, ProducerRequest, TopicData, PartitionData} +import kafka.api.{TopicMetadata, ProducerRequest, PartitionData} class DefaultEventHandler[K,V](config: ProducerConfig, @@ -73,12 +73,13 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap) val failedTopicPartitions = send(brokerid, messageSetPerBroker) - for( (topic, partition) <- failedTopicPartitions ) { - eventsPerBrokerMap.get((topic, partition)) match { + failedTopicPartitions.foreach(topicPartition => { + eventsPerBrokerMap.get(topicPartition) match { case Some(data) => failedProduceRequests.appendAll(data) case None => // nothing + } - } + }) } } catch { case t: Throwable => error("Failed to send messages", t) @@ -93,8 +94,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig, events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m)))) } - def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = { - val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] + def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[TopicPartition, Seq[ProducerData[K,Message]]]]] = { + val ret = new HashMap[Int, Map[TopicPartition, Seq[ProducerData[K,Message]]]] try { for (event <- events) { val topicPartitionsList = getPartitionListForTopic(event) @@ -106,23 +107,23 @@ class DefaultEventHandler[K,V](config: ProducerConfig, // postpone the failure until the send operation, so that requests for other brokers are handled correctly val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1) - var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null + var dataPerBroker: HashMap[TopicPartition, Seq[ProducerData[K,Message]]] = null ret.get(leaderBrokerId) match { case Some(element) => - dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]] + dataPerBroker = element.asInstanceOf[HashMap[TopicPartition, Seq[ProducerData[K,Message]]]] case None => - dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]] + dataPerBroker = new HashMap[TopicPartition, Seq[ProducerData[K,Message]]] ret.put(leaderBrokerId, dataPerBroker) } - val topicAndPartition = (event.getTopic, brokerPartition.partitionId) + val topicPartition = TopicPartition(event.getTopic, brokerPartition.partitionId) var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null - dataPerBroker.get(topicAndPartition) match { + dataPerBroker.get(topicPartition) match { case Some(element) => dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]] case None => dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]] - dataPerBroker.put(topicAndPartition, dataPerTopicPartition) + dataPerBroker.put(topicPartition, dataPerTopicPartition) } dataPerTopicPartition.append(event) } @@ -170,35 +171,30 @@ class DefaultEventHandler[K,V](config: ProducerConfig, * @param messagesPerTopic the messages as a map from (topic, partition) -> messages * @return the set (topic, partitions) messages which incurred an error sending or processing */ - private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Seq[(String, Int)] = { + private def send(brokerId: Int, messagesPerTopic: Map[TopicPartition, ByteBufferMessageSet]) = { if(brokerId < 0) { - warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic)) + warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic)) messagesPerTopic.keys.toSeq } else if(messagesPerTopic.size > 0) { - val topics = new HashMap[String, ListBuffer[PartitionData]]() - for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) { - val partitionData = topics.getOrElseUpdate(topicName, new ListBuffer[PartitionData]()) - partitionData.append(new PartitionData(partitionId, messagesSet)) + val topicPartitionDataPairs = messagesPerTopic.toSeq.map { + case (topicPartition, messages) => + (topicPartition, new PartitionData(topicPartition.partitionId, messages)) } - val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, - config.requestTimeoutMs, topicData) + config.requestTimeoutMs, Map(topicPartitionDataPairs:_*)) try { val syncProducer = producerPool.getProducer(brokerId) val response = syncProducer.send(producerRequest) - trace("producer sent messages for topics %s to broker %d on %s:%d" + trace("Producer sent messages for topics %s to broker %d on %s:%d" .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port)) - var msgIdx = -1 - val errors = new ListBuffer[(String, Int)] - for( topic <- topicData; partition <- topic.partitionDataArray ) { - msgIdx += 1 - if(msgIdx > response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError) - errors.append((topic.topic, partition.partition)) - } - errors + if (response.status.size != producerRequest.data.size) + throw new KafkaException("Incomplete response (%s) for producer request (%s)" + .format(response, producerRequest)) + response.status.filter(_._2.error != ErrorMapping.NoError).toSeq + .map(partitionStatus => partitionStatus._1) } catch { - case e => - warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), e) + case t: Throwable => + warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), t) messagesPerTopic.keys.toSeq } } else { @@ -206,7 +202,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } } - private def groupMessagesToSet(eventsPerTopicAndPartition: Map[(String,Int), Seq[ProducerData[K,Message]]]): Map[(String, Int), ByteBufferMessageSet] = { + private def groupMessagesToSet(eventsPerTopicAndPartition: Map[TopicPartition, Seq[ProducerData[K,Message]]]) = { /** enforce the compressed.topics config here. * If the compression codec is anything other than NoCompressionCodec, * Enable compression only for specified topics if any @@ -215,32 +211,30 @@ class DefaultEventHandler[K,V](config: ProducerConfig, */ val messagesPerTopicPartition = eventsPerTopicAndPartition.map { e => - val topicAndPartition = e._1 + val topicPartition = e._1 val produceData = e._2 val messages = new ListBuffer[Message] produceData.map(p => messages.appendAll(p.getData)) - ( topicAndPartition, + ( topicPartition, config.compressionCodec match { case NoCompressionCodec => - trace("Sending %d messages with no compression to topic %s on partition %d" - .format(messages.size, topicAndPartition._1, topicAndPartition._2)) + trace("Sending %d messages with no compression to %s".format(messages.size, topicPartition)) new ByteBufferMessageSet(NoCompressionCodec, messages: _*) case _ => config.compressedTopics.size match { case 0 => - trace("Sending %d messages with compression codec %d to topic %s on partition %d" - .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2)) + trace("Sending %d messages with compression codec %d to %s" + .format(messages.size, config.compressionCodec.codec, topicPartition)) new ByteBufferMessageSet(config.compressionCodec, messages: _*) case _ => - if(config.compressedTopics.contains(topicAndPartition._1)) { - trace("Sending %d messages with compression codec %d to topic %s on partition %d" - .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2)) + if(config.compressedTopics.contains(topicPartition.topic)) { + trace("Sending %d messages with compression codec %d to %s" + .format(messages.size, config.compressionCodec.codec, topicPartition)) new ByteBufferMessageSet(config.compressionCodec, messages: _*) } else { - trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s" - .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1, - config.compressedTopics.toString)) + trace("Sending %d messages to %s with no compression as it is not in compressed.topics - %s" + .format(messages.size, topicPartition, config.compressedTopics.toString)) new ByteBufferMessageSet(NoCompressionCodec, messages: _*) } } diff --git core/src/main/scala/kafka/server/AbstractFetcherThread.scala core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 1a11d96..6a7924e 100644 --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.cluster.Broker import kafka.consumer.SimpleConsumer -import kafka.common.ErrorMapping +import kafka.common.{TopicPartition, ErrorMapping} import collection.mutable import kafka.message.ByteBufferMessageSet import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder} @@ -32,7 +32,7 @@ import kafka.utils.ShutdownableThread abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) extends ShutdownableThread(name) { - private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map + private val fetchMap = new mutable.HashMap[TopicPartition, Long] // a (topic, partitionId) -> offset map private val fetchMapLock = new Object val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) // callbacks to be defined in subclass @@ -44,7 +44,7 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket def handleOffsetOutOfRange(topic: String, partitionId: Int): Long // deal with partitions with errors, potentially due to leadership changes - def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) + def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) override def shutdown(){ super.shutdown() @@ -59,12 +59,15 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket minBytes(minBytes) fetchMapLock synchronized { - for ( ((topic, partitionId), offset) <- fetchMap ) - builder.addFetch(topic, partitionId, offset.longValue, fetchSize) + fetchMap.foreach { + case((topicPartition, offset)) => + builder.addFetch(topicPartition.topic, topicPartition.partitionId, + offset, fetchSize) + } } val fetchRequest = builder.build() - val partitionsWithError = new mutable.HashSet[(String, Int)] + val partitionsWithError = new mutable.HashSet[TopicPartition] var response: FetchResponse = null try { trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) @@ -83,34 +86,32 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket if (response != null) { // process fetched data fetchMapLock synchronized { - for ( topicData <- response.data ) { - for ( partitionData <- topicData.partitionDataArray) { - val topic = topicData.topic - val partitionId = partitionData.partition - val key = (topic, partitionId) - val currentOffset = fetchMap.get(key) + response.data.foreach { + case(topicPartition, partitionData) => + val (topic, partitionId) = topicPartition.asTuple + val currentOffset = fetchMap.get(topicPartition) if (currentOffset.isDefined) { partitionData.error match { case ErrorMapping.NoError => processPartitionData(topic, currentOffset.get, partitionData) val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes - fetchMap.put(key, newOffset) + fetchMap.put(topicPartition, newOffset) case ErrorMapping.OffsetOutOfRangeCode => val newOffset = handleOffsetOutOfRange(topic, partitionId) - fetchMap.put(key, newOffset) + fetchMap.put(topicPartition, newOffset) warn("current offset %d for topic %s partition %d out of range; reset offset to %d" - .format(currentOffset.get, topic, partitionId, newOffset)) + .format(currentOffset.get, topic, partitionId, newOffset)) case _ => error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host), - ErrorMapping.exceptionFor(partitionData.error)) - partitionsWithError += key - fetchMap.remove(key) + ErrorMapping.exceptionFor(partitionData.error)) + partitionsWithError += topicPartition + fetchMap.remove(topicPartition) } } - } } } } + if (partitionsWithError.size > 0) { debug("handling partitions with error for %s".format(partitionsWithError)) handlePartitionsWithErrors(partitionsWithError) @@ -119,19 +120,19 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket def addPartition(topic: String, partitionId: Int, initialOffset: Long) { fetchMapLock synchronized { - fetchMap.put((topic, partitionId), initialOffset) + fetchMap.put(TopicPartition(topic, partitionId), initialOffset) } } def removePartition(topic: String, partitionId: Int) { fetchMapLock synchronized { - fetchMap.remove((topic, partitionId)) + fetchMap.remove(TopicPartition(topic, partitionId)) } } def hasPartition(topic: String, partitionId: Int): Boolean = { fetchMapLock synchronized { - fetchMap.get((topic, partitionId)).isDefined + fetchMap.get(TopicPartition(topic, partitionId)).isDefined } } diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala index d9cd6c6..4d30969 100644 --- core/src/main/scala/kafka/server/KafkaApis.scala +++ core/src/main/scala/kafka/server/KafkaApis.scala @@ -20,7 +20,6 @@ package kafka.server import java.io.IOException import kafka.admin.{CreateTopicCommand, AdminUtils} import kafka.api._ -import kafka.common._ import kafka.message._ import kafka.network._ import kafka.utils.{Pool, SystemTime, Logging} @@ -32,6 +31,7 @@ import kafka.network.RequestChannel.Response import java.util.concurrent.TimeUnit import kafka.metrics.KafkaMetricsGroup import org.I0Itec.zkclient.ZkClient +import kafka.common._ /** * Logic to handle the various Kafka requests @@ -41,13 +41,12 @@ class KafkaApis(val requestChannel: RequestChannel, val zkClient: ZkClient, brokerId: Int) extends Logging { - private val metricsGroup = brokerId.toString - private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId) - private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel) + private val producerRequestPurgatory = new ProducerRequestPurgatory + private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel) private val delayedRequestMetrics = new DelayedRequestMetrics private val requestLogger = Logger.getLogger("kafka.request.logger") - this.logIdent = "[KafkaApi on Broker " + brokerId + "], " + this.logIdent = "[KafkaApi-%d] ".format(brokerId) /** * Top-level method that handles all requests and multiplexes to the right api @@ -77,7 +76,7 @@ class KafkaApis(val requestChannel: RequestChannel, } - def handleStopReplicaRequest(request: RequestChannel.Request){ + def handleStopReplicaRequest(request: RequestChannel.Request) { val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer) if(requestLogger.isTraceEnabled) requestLogger.trace("Handling stop replica request " + stopReplicaRequest) @@ -94,18 +93,18 @@ class KafkaApis(val requestChannel: RequestChannel, } /** - * Check if the partitionDataArray from a produce request can unblock any + * Check if a partitionData from a produce request can unblock any * DelayedFetch requests. */ - def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) { - var satisfied = new mutable.ArrayBuffer[DelayedFetch] - for(partitionData <- partitionDatas) - satisfied ++= fetchRequestPurgatory.update(RequestKey(topic, partitionData.partition), null) - trace("Producer request to %s unblocked %d fetch requests.".format(topic, satisfied.size)) + def maybeUnblockDelayedFetchRequests(topic: String, partitionData: PartitionData) { + val partition = partitionData.partition + val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), null) + trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size)) + // send any newly unblocked responses for(fetchReq <- satisfied) { val topicData = readMessageSets(fetchReq.fetch) - val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData) + val response = FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData) val fromFollower = fetchReq.fetch.replicaId != FetchRequest.NonFollowerId delayedRequestMetrics.recordDelayedFetchSatisfied( @@ -125,28 +124,23 @@ class KafkaApis(val requestChannel: RequestChannel, requestLogger.trace("Handling producer request " + request.toString) trace("Handling producer request " + request.toString) - val response = produceToLocalLog(produceRequest) + val localProduceResponse = produceToLocalLog(produceRequest) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) - - for (topicData <- produceRequest.data) - maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray) - + + produceRequest.data.foreach(partitionAndData => + maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._2)) + if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1 || produceRequest.data.size <= 0) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(localProduceResponse))) else { // create a list of (topic, partition) pairs to use as keys for this delayed request - val producerRequestKeys = produceRequest.data.flatMap(topicData => { - val topic = topicData.topic - topicData.partitionDataArray.map(partitionData => { - RequestKey(topic, partitionData.partition) - }) - }) + val producerRequestKeys = produceRequest.data.keys.map( + topicAndPartition => new RequestKey(topicAndPartition)).toSeq val delayedProduce = new DelayedProduce( - producerRequestKeys, request, - response.errors, response.offsets, + producerRequestKeys, request, localProduceResponse, produceRequest, produceRequest.ackTimeoutMs.toLong) producerRequestPurgatory.watch(delayedProduce) @@ -170,43 +164,44 @@ class KafkaApis(val requestChannel: RequestChannel, */ private def produceToLocalLog(request: ProducerRequest): ProducerResponse = { trace("Produce [%s] to local log ".format(request.toString)) - val requestSize = request.topicPartitionCount - val errors = new Array[Short](requestSize) - val offsets = new Array[Long](requestSize) - - var msgIndex = -1 - for(topicData <- request.data) { - for(partitionData <- topicData.partitionDataArray) { - msgIndex += 1 - BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes) - BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes) - try { - val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition) - val log = localReplica.log.get - log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) - // we may need to increment high watermark since ISR could be down to 1 - localReplica.partition.maybeIncrementLeaderHW(localReplica) - offsets(msgIndex) = log.logEndOffset - errors(msgIndex) = ErrorMapping.NoError.toShort - trace("%d bytes written to logs, nextAppendOffset = %d" - .format(partitionData.messages.sizeInBytes, offsets(msgIndex))) - } catch { - case e => - BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest - BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest - error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e) - e match { - case _: IOException => - fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) - System.exit(1) - case _ => - errors(msgIndex) = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort - offsets(msgIndex) = -1 - } - } + + val localErrorsAndOffsets = request.data.map (topicAndPartitionData => { + val (topic, partitionData) = (topicAndPartitionData._1.topic, topicAndPartitionData._2) + BrokerTopicStat.getBrokerTopicStat(topic).recordBytesIn(partitionData.messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes) + + val messagesSize = partitionData.messages.sizeInBytes + BrokerTopicStat.getBrokerTopicStat(topic).recordBytesIn(messagesSize) + BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(messagesSize) + try { + val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partitionData.partition) + val log = localReplica.log.get + log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) + // we may need to increment high watermark since ISR could be down to 1 + localReplica.partition.maybeIncrementLeaderHW(localReplica) + val responseStatus = ProducerResponseStatus(ErrorMapping.NoError, log.logEndOffset) + trace("%d bytes written to logs, nextAppendOffset = %d" + .format(partitionData.messages.sizeInBytes, responseStatus.nextOffset)) + (TopicPartition(topic, partitionData.partition), responseStatus) + } catch { + case e: Throwable => + BrokerTopicStat.getBrokerTopicStat(topic).recordFailedProduceRequest + BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest + error("Error processing ProducerRequest on %s:%d".format(topic, partitionData.partition), e) + e match { + case _: IOException => + fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) + // compiler requires scala.sys.exit (not System.exit). + exit(1) + case _ => + val (error, offset) = (ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1L) + (TopicPartition(topic, partitionData.partition), ProducerResponseStatus(error, offset)) + } } } - new ProducerResponse(request.versionId, request.correlationId, errors, offsets) + ) + + ProducerResponse(request.versionId, request.correlationId, localErrorsAndOffsets) } /** @@ -218,26 +213,15 @@ class KafkaApis(val requestChannel: RequestChannel, requestLogger.trace("Handling fetch request " + fetchRequest.toString) trace("Handling fetch request " + fetchRequest.toString) - // validate the request - try { - fetchRequest.validate() - } catch { - case e:FetchRequestFormatException => - val response = new FetchResponse(fetchRequest.versionId, fetchRequest.correlationId, Array.empty) - val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response)) - requestChannel.sendResponse(channelResponse) - } - if(fetchRequest.replicaId != FetchRequest.NonFollowerId) { maybeUpdatePartitionHW(fetchRequest) // after updating HW, some delayed produce requests may be unblocked var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] - fetchRequest.offsetInfo.foreach(topicOffsetInfo => { - topicOffsetInfo.partitions.foreach(partition => { - val key = RequestKey(topicOffsetInfo.topic, partition) + fetchRequest.requestInfo.foreach { + case (topicPartition, _) => + val key = new RequestKey(topicPartition) satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key) - }) - }) + } debug("Replica %d fetch unblocked %d producer requests." .format(fetchRequest.replicaId, satisfiedProduceRequests.size)) satisfiedProduceRequests.foreach(_.respond()) @@ -250,13 +234,13 @@ class KafkaApis(val requestChannel: RequestChannel, fetchRequest.numPartitions <= 0) { val topicData = readMessageSets(fetchRequest) debug("Returning fetch response %s for fetch request with correlation id %d".format( - topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId)) - val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData) + topicData.values.map(_.error).mkString(","), fetchRequest.correlationId)) + val response = FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) } else { debug("Putting fetch request into purgatory") // create a list of (topic, partition) pairs to use as keys for this delayed request - val delayedFetchKeys = fetchRequest.offsetInfo.flatMap(o => o.partitions.map(RequestKey(o.topic, _))) + val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_)) val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait) fetchRequestPurgatory.watch(delayedFetch) } @@ -266,52 +250,46 @@ class KafkaApis(val requestChannel: RequestChannel, * Calculate the number of available bytes for the given fetch request */ private def availableFetchBytes(fetchRequest: FetchRequest): Long = { - var totalBytes = 0L - for(offsetDetail <- fetchRequest.offsetInfo) { - for(i <- 0 until offsetDetail.partitions.size) { + val totalBytes = fetchRequest.requestInfo.foldLeft(0L)((folded, curr) => { + folded + + { + val (topic, partition) = (curr._1.topic, curr._1.partitionId) + val (offset, fetchSize) = (curr._2.offset, curr._2.fetchSize) try { - val localReplica = replicaManager.getReplica(offsetDetail.topic, offsetDetail.partitions(i)) + val localReplica = replicaManager.getReplica(topic, partition) val available = localReplica match { - case Some(replica) => max(0, replica.log.get.logEndOffset - offsetDetail.offsets(i)) + case Some(replica) => max(0, replica.log.get.logEndOffset - offset) case None => 0 } - totalBytes += math.min(offsetDetail.fetchSizes(i), available) + math.min(fetchSize, available) } catch { - case e: UnknownTopicOrPartitionException => - info("Invalid partition %d in fetch request from client %d." - .format(offsetDetail.partitions(i), fetchRequest.clientId)) + case e: UnknownTopicOrPartitionException => { + info("Invalid partition %d in fetch request from client %s.".format(partition, fetchRequest.clientId)) + 0 + } } } - } + }) trace(totalBytes + " available bytes for fetch request.") totalBytes } private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) { - val offsets = fetchRequest.offsetInfo - debug("Act on update partition HW, check offset detail: %s ".format(offsets)) - for(offsetDetail <- offsets) { - val topic = offsetDetail.topic - val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets) - for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) { - replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset) - } - } + debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest)) + fetchRequest.requestInfo.foreach(info => { + val (topic, partition, offset) = (info._1.topic, info._1.partitionId, info._2.offset) + replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset) + }) } /** - * Read from all the offset details given and produce an array of topic datas + * Read from all the offset details given and return a map of + * (topic, partition) -> PartitionData */ - private def readMessageSets(fetchRequest: FetchRequest): Array[TopicData] = { - val offsets = fetchRequest.offsetInfo - val fetchedData = new mutable.ArrayBuffer[TopicData]() - - for(offsetDetail <- offsets) { - val info = new mutable.ArrayBuffer[PartitionData]() - val topic = offsetDetail.topic - val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes) - for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { - val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match { + private def readMessageSets(fetchRequest: FetchRequest) = + fetchRequest.requestInfo.map { + case (TopicPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => + val partitionData = readMessageSet(topic, partition, offset, fetchSize) match { case Left(err) => BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest @@ -326,18 +304,14 @@ class KafkaApis(val requestChannel: RequestChannel, val leaderReplica = replicaManager.getReplica(topic, partition).get if (fetchRequest.replicaId != FetchRequest.NonFollowerId) { debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]" - .format(topic, partition, fetchRequest.replicaId)) + .format(topic, partition, fetchRequest.replicaId)) debug("Leader returning %d messages for topic %s partition %d to follower %d" - .format(messages.sizeInBytes, topic, partition, fetchRequest.replicaId)) + .format(messages.sizeInBytes, topic, partition, fetchRequest.replicaId)) } new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark, messages) } - info.append(partitionInfo) - } - fetchedData.append(new TopicData(topic, info.toArray)) + (TopicPartition(topic, partition), partitionData) } - fetchedData.toArray - } /** * Read from a single topic/partition at the given offset @@ -446,8 +420,14 @@ class KafkaApis(val requestChannel: RequestChannel, private [kafka] case class RequestKey(topic: String, partition: Int) extends MetricKey { + + def this(topicPartition: TopicPartition) = this(topicPartition.topic, topicPartition.partitionId) + + def topicPartition = TopicPartition(topic, partition) + override def keyLabel = "%s-%d".format(topic, partition) } + /** * A delayed fetch request */ @@ -457,9 +437,9 @@ class KafkaApis(val requestChannel: RequestChannel, /** * A holding pen for fetch requests waiting to be satisfied */ - class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) { + class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) { - this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId) + this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId) /** @@ -473,7 +453,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def expire(delayed: DelayedFetch) { val topicData = readMessageSets(delayed.fetch) - val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) + val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) val fromFollower = delayed.fetch.replicaId != FetchRequest.NonFollowerId delayedRequestMetrics.recordDelayedFetchExpired(fromFollower, response) requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) @@ -482,48 +462,43 @@ class KafkaApis(val requestChannel: RequestChannel, class DelayedProduce(keys: Seq[RequestKey], request: RequestChannel.Request, - localErrors: Array[Short], - requiredOffsets: Array[Long], + localProduceResponse: ProducerResponse, val produce: ProducerRequest, delayMs: Long) extends DelayedRequest(keys, request, delayMs) with Logging { + private val initialErrorsAndOffsets = localProduceResponse.status /** * Map of (topic, partition) -> partition status * The values in this map don't need to be synchronized since updates to the * values are effectively synchronized by the ProducerRequestPurgatory's * update method */ - private [kafka] val partitionStatus = keys.map(key => { - val keyIndex = keys.indexOf(key) + private [kafka] val partitionStatus = keys.map(requestKey => { + val producerResponseStatus = initialErrorsAndOffsets(TopicPartition(requestKey.topic, requestKey.partition)) // if there was an error in writing to the local replica's log, then don't // wait for acks on this partition - val acksPending = - if (localErrors(keyIndex) == ErrorMapping.NoError) { + val (acksPending, error, nextOffset) = + if (producerResponseStatus.error == ErrorMapping.NoError) { // Timeout error state will be cleared when requiredAcks are received - localErrors(keyIndex) = ErrorMapping.RequestTimedOutCode - true + (true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.nextOffset) } - else - false + else (false, producerResponseStatus.error, producerResponseStatus.nextOffset) - val initialStatus = new PartitionStatus(acksPending, localErrors(keyIndex), requiredOffsets(keyIndex)) - trace("Initial partition status for %s = %s".format(key, initialStatus)) - (key, initialStatus) + val initialStatus = PartitionStatus(acksPending, error, nextOffset) + trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus)) + (requestKey, initialStatus) }).toMap - def respond() { - val errorsAndOffsets: (List[Short], List[Long]) = ( - keys.foldRight - ((List[Short](), List[Long]())) - ((key: RequestKey, result: (List[Short], List[Long])) => { - val status = partitionStatus(key) - (status.error :: result._1, status.requiredOffset :: result._2) - }) - ) - val response = new ProducerResponse(produce.versionId, produce.correlationId, - errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray) + + val finalErrorsAndOffsets = initialErrorsAndOffsets.map( + status => { + val pstat = partitionStatus(new RequestKey(status._1)) + (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset)) + }) + + val response = ProducerResponse(produce.versionId, produce.correlationId, finalErrorsAndOffsets) requestChannel.sendResponse(new RequestChannel.Response( request, new BoundedByteBufferSend(response))) @@ -559,12 +534,11 @@ class KafkaApis(val requestChannel: RequestChannel, fetchPartitionStatus.error = ErrorMapping.NoError } if (!fetchPartitionStatus.acksPending) { - val topicData = produce.data.find(_.topic == topic).get - val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get + val partitionData = produce.data(followerFetchRequestKey.topicPartition) delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key, durationNs, - partitionData.sizeInBytes) - maybeUnblockDelayedFetchRequests(topic, Array(partitionData)) + partitionData.messages.sizeInBytes.toInt) + maybeUnblockDelayedFetchRequests(topic, partitionData) } } @@ -575,7 +549,7 @@ class KafkaApis(val requestChannel: RequestChannel, satisfied } - class PartitionStatus(var acksPending: Boolean, + case class PartitionStatus(var acksPending: Boolean, var error: Short, val requiredOffset: Long) { def setThisBrokerNotLeader() { @@ -593,9 +567,9 @@ class KafkaApis(val requestChannel: RequestChannel, /** * A holding pen for produce requests waiting to be satisfied. */ - private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) { + private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) { - this.logIdent = "[ProducerRequestPurgatory-%d], ".format(brokerId) + this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId) protected def checkSatisfied(followerFetchRequestKey: RequestKey, delayedProduce: DelayedProduce) = @@ -705,17 +679,15 @@ class KafkaApis(val requestChannel: RequestChannel, else aggregateNonFollowerFetchRequestMetrics metrics.throughputMeter.mark(response.sizeInBytes) - response.topicMap.foreach(topicAndData => { - val topic = topicAndData._1 - topicAndData._2.partitionDataArray.foreach(partitionData => { - val key = RequestKey(topic, partitionData.partition) + response.data.foreach { + case(topicPartition, partitionData) => + val key = new RequestKey(topicPartition) val keyMetrics = if (forFollower) followerFetchRequestMetricsForKey.getAndMaybePut(key) else nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key) keyMetrics.throughputMeter.mark(partitionData.sizeInBytes) - }) - }) + } } diff --git core/src/main/scala/kafka/server/ReplicaFetcherThread.scala core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 7c8ff4e..541ee41 100644 --- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -20,6 +20,8 @@ package kafka.server import kafka.api.{OffsetRequest, PartitionData} import kafka.cluster.Broker import kafka.message.ByteBufferMessageSet +import kafka.common.TopicPartition + class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager) extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs, @@ -56,7 +58,7 @@ class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: Kafk } // any logic for partitions whose leader has changed - def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) { + def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) { // no handler needed since the controller will make the changes accordingly } } diff --git core/src/main/scala/kafka/utils/Utils.scala core/src/main/scala/kafka/utils/Utils.scala index fe0a08b..bab8b13 100644 --- core/src/main/scala/kafka/utils/Utils.scala +++ core/src/main/scala/kafka/utils/Utils.scala @@ -26,7 +26,6 @@ import java.util.zip.CRC32 import javax.management._ import scala.collection._ import scala.collection.mutable -import kafka.message.{NoCompressionCodec, CompressionCodec} import org.I0Itec.zkclient.ZkClient import java.util.{Random, Properties} import joptsimple.{OptionSpec, OptionSet, OptionParser} @@ -369,7 +368,7 @@ object Utils extends Logging { /** * Read an unsigned integer from the given position without modifying the buffers * position - * @param The buffer to read from + * @param buffer the buffer to read from * @param index the index from which to read the integer * @return The integer read, as a long to avoid signedness */ diff --git core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 1a6be9f..e328a5c 100644 --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -19,7 +19,7 @@ package kafka.integration import java.nio.ByteBuffer import junit.framework.Assert._ -import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder} +import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder} import kafka.server.{KafkaRequestHandler, KafkaConfig} import java.util.Properties import kafka.producer.{ProducerData, Producer, ProducerConfig} @@ -31,8 +31,8 @@ import org.I0Itec.zkclient.ZkClient import kafka.zk.ZooKeeperTestHarness import org.scalatest.junit.JUnit3Suite import scala.collection._ -import kafka.common.{ErrorMapping, UnknownTopicOrPartitionException, FetchRequestFormatException, OffsetOutOfRangeException} import kafka.admin.{AdminUtils, CreateTopicCommand} +import kafka.common.{TopicPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} /** * End to end tests of the primitive apis against a local server @@ -77,27 +77,11 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertEquals(request, deserializedRequest) } - def testFetchRequestEnforcesUniqueTopicsForOffsetDetails() { - val offsets = Array( - new OffsetDetail("topic1", Array(0, 1, 2), Array(0L, 0L, 0L), Array(1000, 1000, 1000)), - new OffsetDetail("topic2", Array(0, 1, 2), Array(0L, 0L, 0L), Array(1000, 1000, 1000)), - new OffsetDetail("topic1", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000)), - new OffsetDetail("topic2", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000)) - ) - val request = new FetchRequest(offsetInfo = offsets) - try { - consumer.fetch(request) - fail("FetchRequest should throw FetchRequestFormatException due to duplicate topics") - } catch { - case e: FetchRequestFormatException => "success" - } - } - def testEmptyFetchRequest() { - val offsets = Array[OffsetDetail]() - val request = new FetchRequest(offsetInfo = offsets) + val partitionRequests = immutable.Map[TopicPartition, PartitionFetchInfo]() + val request = new FetchRequest(requestInfo = partitionRequests) val fetched = consumer.fetch(request) - assertTrue(fetched.errorCode == ErrorMapping.NoError && fetched.data.size == 0) + assertTrue(!fetched.hasError && fetched.data.size == 0) } def testDefaultEncoderProducerAndFetch() { diff --git core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala deleted file mode 100644 index 3b1aa2a..0000000 --- core/src/test/scala/unit/kafka/javaapi/integration/ProducerConsumerTestHarness.scala +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.javaapi.integration - -import org.scalatest.junit.JUnit3Suite -import java.util.Properties -import kafka.producer.SyncProducerConfig -import kafka.javaapi.producer.SyncProducer -import kafka.javaapi.consumer.SimpleConsumer - -trait ProducerConsumerTestHarness extends JUnit3Suite { - - val port: Int - val host = "localhost" - var producer: SyncProducer = null - var consumer: SimpleConsumer = null - - override def setUp() { - val props = new Properties() - props.put("host", host) - props.put("port", port.toString) - props.put("buffer.size", "65536") - props.put("connect.timeout.ms", "100000") - props.put("reconnect.interval", "10000") - producer = new SyncProducer(new SyncProducerConfig(props)) - consumer = new SimpleConsumer(host, - port, - 1000000, - 64*1024) - super.setUp - } - - override def tearDown() { - super.tearDown - producer.close() - consumer.close() - } -} diff --git core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala index 16057dc..7138435 100644 --- core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala +++ core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.network; +package kafka.network import org.junit._ import org.scalatest.junit.JUnitSuite @@ -24,31 +24,45 @@ import java.nio.ByteBuffer import kafka.api._ import kafka.message.{Message, ByteBufferMessageSet} import kafka.cluster.Broker -import kafka.common.ErrorMapping import collection.mutable._ +import kafka.common.{TopicPartition, ErrorMapping} + object RpcDataSerializationTestUtils{ private val topic1 = "test1" private val topic2 = "test2" - private val leader1 = 0; + private val leader1 = 0 private val isr1 = List(0, 1, 2) - private val leader2 = 0; + private val leader2 = 0 private val isr2 = List(0, 2, 3) private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes))) private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes))) private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes))) private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes))) private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2, partitionData3) - private val topicData1 = new TopicData(topic1, partitionDataArray) - private val topicData2 = new TopicData(topic2, partitionDataArray) - private val topicDataArray = Array(topicData1, topicData2) - private val offsetDetail1 = new OffsetDetail(topic1, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100)) - private val offsetDetail2 = new OffsetDetail(topic2, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100)) - private val offsetDetailSeq = Seq(offsetDetail1, offsetDetail2) - private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) - private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) - private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) - private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) + + private val topicData = { + val groupedData = Array(topic1, topic2).flatMap(topic => + partitionDataArray.map(partitionData => + (TopicPartition(topic, partitionData.partition), partitionData))) + collection.immutable.Map(groupedData:_*) + } + + private val requestInfos = collection.immutable.Map( + TopicPartition(topic1, 0) -> PartitionFetchInfo(1000, 100), + TopicPartition(topic1, 1) -> PartitionFetchInfo(2000, 100), + TopicPartition(topic1, 2) -> PartitionFetchInfo(3000, 100), + TopicPartition(topic1, 3) -> PartitionFetchInfo(4000, 100), + TopicPartition(topic2, 0) -> PartitionFetchInfo(1000, 100), + TopicPartition(topic2, 1) -> PartitionFetchInfo(2000, 100), + TopicPartition(topic2, 2) -> PartitionFetchInfo(3000, 100), + TopicPartition(topic2, 3) -> PartitionFetchInfo(4000, 100) + ) + + private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) + private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) + private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) + private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty) private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3) private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq) private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) @@ -78,19 +92,21 @@ object RpcDataSerializationTestUtils{ } def createTestProducerRequest: ProducerRequest = { - new ProducerRequest(1, "client 1", 0, 1000, topicDataArray) + new ProducerRequest(1, "client 1", 0, 1000, topicData) } - def createTestProducerResponse: ProducerResponse = { - new ProducerResponse(1, 1, Array(0.toShort, 0.toShort), Array(1000l, 2000l), 0) - } + def createTestProducerResponse: ProducerResponse = + ProducerResponse(1, 1, Map( + TopicPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001), + TopicPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001) + )) def createTestFetchRequest: FetchRequest = { - new FetchRequest(offsetInfo = offsetDetailSeq) + new FetchRequest(requestInfo = requestInfos) } def createTestFetchResponse: FetchResponse = { - new FetchResponse(1, 1, topicDataArray) + FetchResponse(1, 1, topicData) } def createTestOffsetRequest: OffsetRequest = { @@ -154,7 +170,7 @@ class RpcDataSerializationTest extends JUnitSuite { assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse, deserializedStopReplicaResponse) - buffer = ByteBuffer.allocate(producerRequest.sizeInBytes()) + buffer = ByteBuffer.allocate(producerRequest.sizeInBytes) producerRequest.writeTo(buffer) buffer.rewind() val deserializedProducerRequest = ProducerRequest.readFrom(buffer) diff --git core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index b21712d..b439c8c 100644 --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -201,11 +201,11 @@ class AsyncProducerTest extends JUnit3Suite { topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes)))) val expectedResult = Some(Map( 0 -> Map( - ("topic1", 0) -> topic1Broker1Data, - ("topic2", 0) -> topic2Broker1Data), + TopicPartition("topic1", 0) -> topic1Broker1Data, + TopicPartition("topic2", 0) -> topic2Broker1Data), 1 -> Map( - ("topic1", 1) -> topic1Broker2Data, - ("topic2", 1) -> topic2Broker2Data) + TopicPartition("topic1", 1) -> topic1Broker2Data, + TopicPartition("topic2", 1) -> topic2Broker2Data) )) val actualResult = handler.partitionAndCollate(producerDataList) @@ -344,7 +344,7 @@ class AsyncProducerTest extends JUnit3Suite { partitionedDataOpt match { case Some(partitionedData) => for ((brokerId, dataPerBroker) <- partitionedData) { - for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker) + for ( (TopicPartition(topic, partitionId), dataPerTopic) <- dataPerBroker) assertTrue(partitionId == 0) } case None => @@ -408,10 +408,12 @@ class AsyncProducerTest extends JUnit3Suite { // entirely. The second request will succeed for partition 1 but fail for partition 0. // On the third try for partition 0, let it succeed. val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), TestUtils.messagesToSet(msgs), 0) - val response1 = - new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L)) + val response1 = ProducerResponse(ProducerRequest.CurrentVersion, 0, + Map((TopicPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)), + (TopicPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) val request2 = TestUtils.produceRequest(topic1, 0, TestUtils.messagesToSet(msgs)) - val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)) + val response2 = ProducerResponse(ProducerRequest.CurrentVersion, 0, + Map((TopicPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1) diff --git core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 068bf7c..14555e4 100644 --- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -21,14 +21,14 @@ import java.net.SocketTimeoutException import java.util.Properties import junit.framework.Assert import kafka.admin.CreateTopicCommand -import kafka.common.{ErrorMapping, MessageSizeTooLargeException} import kafka.integration.KafkaServerTestHarness import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, Message, ByteBufferMessageSet} import kafka.server.KafkaConfig import kafka.utils.{TestZKUtils, SystemTime, TestUtils} import org.junit.Test import org.scalatest.junit.JUnit3Suite -import kafka.api.TopicData +import kafka.api.{ProducerResponseStatus, PartitionData} +import kafka.common.{TopicPartition, ErrorMapping, MessageSizeTooLargeException} class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { private var messageBytes = new Array[Byte](2); @@ -85,11 +85,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack = SyncProducerConfig.DefaultRequiredAcks - val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]()) + val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicPartition, PartitionData]()) val producer = new SyncProducer(new SyncProducerConfig(props)) val response = producer.send(emptyRequest) - Assert.assertTrue(response.errorCode == ErrorMapping.NoError && response.errors.size == 0 && response.offsets.size == 0) + Assert.assertTrue(!response.hasError && response.status.size == 0) } @Test @@ -152,10 +152,12 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertNotNull(response) Assert.assertEquals(request.correlationId, response.correlationId) - Assert.assertEquals(response.errors.length, response.offsets.length) - Assert.assertEquals(3, response.errors.length) - response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, _)) - response.offsets.foreach(Assert.assertEquals(-1L, _)) + Assert.assertEquals(3, response.status.size) + response.status.values.foreach { + case ProducerResponseStatus(error, nextOffset) => + Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, error) + Assert.assertEquals(-1L, nextOffset) + } // #2 - test that we get correct offsets when partition is owned by broker CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1) @@ -166,18 +168,18 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val response2 = producer.send(request) Assert.assertNotNull(response2) Assert.assertEquals(request.correlationId, response2.correlationId) - Assert.assertEquals(response2.errors.length, response2.offsets.length) - Assert.assertEquals(3, response2.errors.length) + Assert.assertEquals(3, response2.status.size) // the first and last message should have been accepted by broker - Assert.assertEquals(0, response2.errors(0)) - Assert.assertEquals(0, response2.errors(2)) - Assert.assertEquals(messages.sizeInBytes, response2.offsets(0)) - Assert.assertEquals(messages.sizeInBytes, response2.offsets(2)) + Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicPartition("topic1", 0)).error) + Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicPartition("topic3", 0)).error) + Assert.assertEquals(messages.sizeInBytes, response2.status(TopicPartition("topic1", 0)).nextOffset) + Assert.assertEquals(messages.sizeInBytes, response2.status(TopicPartition("topic3", 0)).nextOffset) // the middle message should have been rejected because broker doesn't lead partition - Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, response2.errors(1)) - Assert.assertEquals(-1, response2.offsets(1)) + Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, + response2.status(TopicPartition("topic2", 0)).error) + Assert.assertEquals(-1, response2.status(TopicPartition("topic2", 0)).nextOffset) } @Test diff --git core/src/test/scala/unit/kafka/utils/TestUtils.scala core/src/test/scala/unit/kafka/utils/TestUtils.scala index bbaa7e8..407767c 100644 --- core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -33,10 +33,10 @@ import collection.mutable.ListBuffer import kafka.consumer.ConsumerConfig import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.TimeUnit -import kafka.common.ErrorMapping import kafka.api._ -import collection.mutable.{Map, Set} +import collection.mutable.Map import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} +import kafka.common.TopicPartition /** @@ -366,8 +366,10 @@ object TestUtils extends Logging { val correlationId = SyncProducerConfig.DefaultCorrelationId val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs - val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray)) - new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, data.toArray) + val data = topics.flatMap(topic => + partitions.map(partition => (TopicPartition(topic, partition), new PartitionData(partition, message))) + ) + new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*)) } def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { @@ -382,10 +384,8 @@ object TestUtils extends Logging { val clientId = "test" val requiredAcks: Short = 0 val ackTimeoutMs = 0 - var data = new Array[TopicData](1) - var partitionData = new Array[PartitionData](1) - partitionData(0) = new PartitionData(partition,message.underlying) - data(0) = new TopicData(topic,partitionData) + val data = new java.util.HashMap[TopicPartition, PartitionData]() + data.put(TopicPartition("topic", 0), new PartitionData(partition,message.underlying)) val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data) pr }