diff --git core/src/main/scala/kafka/api/ProducerRequest.scala core/src/main/scala/kafka/api/ProducerRequest.scala index a350c6a..95683b6 100644 --- core/src/main/scala/kafka/api/ProducerRequest.scala +++ core/src/main/scala/kafka/api/ProducerRequest.scala @@ -20,36 +20,36 @@ package kafka.api import java.nio._ import kafka.message._ import kafka.utils._ +import collection.immutable.SortedMap object ProducerRequest { val CurrentVersion: Short = 0 + val DefaultCharset = "UTF-8" def readFrom(buffer: ByteBuffer): ProducerRequest = { val versionId: Short = buffer.getShort val correlationId: Int = buffer.getInt - val clientId: String = Utils.readShortString(buffer, "UTF-8") + val clientId: String = Utils.readShortString(buffer, DefaultCharset) val requiredAcks: Short = buffer.getShort val ackTimeoutMs: Int = buffer.getInt //build the topic structure val topicCount = buffer.getInt - val data = new Array[TopicData](topicCount) - for(i <- 0 until topicCount) { - val topic = Utils.readShortString(buffer, "UTF-8") - + val partitionDataPairs = (1 to topicCount).flatMap(_ => { + // process topic + val topic = Utils.readShortString(buffer, DefaultCharset) val partitionCount = buffer.getInt - //build the partition structure within this topic - val partitionData = new Array[PartitionData](partitionCount) - for (j <- 0 until partitionCount) { + (1 to partitionCount).map(_ => { val partition = buffer.getInt val messageSetSize = buffer.getInt val messageSetBuffer = new Array[Byte](messageSetSize) buffer.get(messageSetBuffer,0,messageSetSize) - partitionData(j) = new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) - } - data(i) = new TopicData(topic,partitionData) - } - new ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, data) + ((topic, partition), new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))) + }) + }) + + new ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, + SortedMap(partitionDataPairs:_*)) } } @@ -58,58 +58,70 @@ case class ProducerRequest( versionId: Short, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, - data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) { + data: SortedMap[(String, Int), PartitionData]) + extends RequestOrResponse(Some(RequestKeys.Produce)) { + + /** + * Partitions the data into a map of maps (one for each topic). + */ + private def dataGroupedByTopic = data.groupBy(kv => { kv._1._1 }) - def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) = + def this(correlationId: Int, + clientId: String, + requiredAcks: Short, + ackTimeoutMs: Int, + data: SortedMap[(String, Int), PartitionData]) = this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data) def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) buffer.putInt(correlationId) - Utils.writeShortString(buffer, clientId, "UTF-8") + Utils.writeShortString(buffer, clientId, ProducerRequest.DefaultCharset) buffer.putShort(requiredAcks) buffer.putInt(ackTimeoutMs) + //save the topic structure - buffer.putInt(data.size) //the number of topics - for(topicData <- data) { - Utils.writeShortString(buffer, topicData.topic, "UTF-8") //write the topic - buffer.putInt(topicData.partitionDataArray.size) //the number of partitions - for(partitionData <- topicData.partitionDataArray) { + val groupedData = dataGroupedByTopic + buffer.putInt(groupedData.size) //the number of topics + groupedData.foreach(topicAndData => { + val (topic, topicAndPartitionData) = topicAndData + Utils.writeShortString(buffer, topic, ProducerRequest.DefaultCharset) //write the topic + buffer.putInt(topicAndPartitionData.size) //the number of partitions + topicAndPartitionData.foreach(partitionAndData => { + val partitionData = partitionAndData._2 buffer.putInt(partitionData.partition) buffer.putInt(partitionData.messages.getSerialized().limit) buffer.put(partitionData.messages.getSerialized()) partitionData.messages.getSerialized().rewind - } - } + + }) + }) } def sizeInBytes: Int = { - var size = 0 - //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size - size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4 - for(topicData <- data) { - size += 2 + topicData.topic.length + 4 - for(partitionData <- topicData.partitionDataArray) { - size += 4 + 4 + partitionData.messages.sizeInBytes.asInstanceOf[Int] - } - } - size - } + val groupedData = dataGroupedByTopic - // need to override case-class equals due to broken java-array equals() - override def equals(other: Any): Boolean = { - other match { - case that: ProducerRequest => - ( correlationId == that.correlationId && - clientId == that.clientId && - requiredAcks == that.requiredAcks && - ackTimeoutMs == that.ackTimeoutMs && - data.toSeq == that.data.toSeq ) - case _ => false - } + 2 + /* versionId */ + 4 + /* correlationId */ + Utils.shortStringLength(clientId, ProducerRequest.DefaultCharset) + /* client id */ + 2 + /* requiredAcks */ + 4 + /* ackTimeoutMs */ + 4 + /* number of topics */ + groupedData.foldLeft(0)((foldedTopics, currTopic) => { + foldedTopics + + Utils.shortStringLength(currTopic._1, ProducerRequest.DefaultCharset) + + 4 + /* the number of partitions */ + { + currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => { + foldedPartitions + + 4 + /* partition id */ + 4 + /* byte-length of serialized messages */ + currPartition._2.messages.sizeInBytes.toInt + }) + } + }) } - def topicPartitionCount = data.foldLeft(0)(_ + _.partitionDataArray.length) - } diff --git core/src/main/scala/kafka/api/ProducerResponse.scala core/src/main/scala/kafka/api/ProducerResponse.scala index 06e8625..5b254a9 100644 --- core/src/main/scala/kafka/api/ProducerResponse.scala +++ core/src/main/scala/kafka/api/ProducerResponse.scala @@ -19,43 +19,79 @@ package kafka.api import java.nio.ByteBuffer import kafka.common.ErrorMapping +import collection.immutable.SortedMap +import kafka.utils.Utils object ProducerResponse { + val DefaultCharset = "UTF-8" + def readFrom(buffer: ByteBuffer): ProducerResponse = { val versionId = buffer.getShort val correlationId = buffer.getInt val errorCode = buffer.getShort - val errorsSize = buffer.getInt - val errors = new Array[Short](errorsSize) - for( i <- 0 until errorsSize) { - errors(i) = buffer.getShort - } - val offsetsSize = buffer.getInt - val offsets = new Array[Long](offsetsSize) - for( i <- 0 until offsetsSize) { - offsets(i) = buffer.getLong - } - new ProducerResponse(versionId, correlationId, errors, offsets, errorCode) + val topicCount = buffer.getInt + val statusPairs = (1 to topicCount).flatMap(_ => { + val topic = Utils.readShortString(buffer, DefaultCharset) + val partitionCount = buffer.getInt + (1 to partitionCount).map(_ => { + val partition = buffer.getInt + val error = buffer.getShort + val offset = buffer.getLong + ((topic, partition), (error, offset)) + }) + }) + + new ProducerResponse(versionId, correlationId, SortedMap(statusPairs:_*), errorCode) } } -case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short], - offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ - val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length) +case class ProducerResponse(versionId: Short, + correlationId: Int, + status: SortedMap[(String, Int), (Short, Long)], + errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ + + val statusGroupedByTopic = status.groupBy(kv => kv._1._1) + + val sizeInBytes = { + val groupedStatus = statusGroupedByTopic + 2 + /* version id */ + 4 + /* correlation id */ + 2 + /* error code */ + 4 + /* topic count */ + groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => { + foldedTopics + + Utils.shortStringLength(currTopic._1, ProducerResponse.DefaultCharset) + + 4 + /* partition count for this topic */ + currTopic._2.foldLeft (0) ((foldedPartitions, currPartition) => { + foldedPartitions + + 4 + /* partition id */ + 2 + /* error code */ + 8 /* offset */ + }) + }) + } def writeTo(buffer: ByteBuffer) { - /* version id */ + val groupedStatus = statusGroupedByTopic + buffer.putShort(versionId) - /* correlation id */ buffer.putInt(correlationId) - /* error code */ buffer.putShort(errorCode) - /* errors */ - buffer.putInt(errors.length) - errors.foreach(buffer.putShort(_)) - /* offsets */ - buffer.putInt(offsets.length) - offsets.foreach(buffer.putLong(_)) + /* topic count */ + buffer.putInt(groupedStatus.size) + + groupedStatus.foreach(topicStatus => { + val (topic, errorsAndOffsets) = topicStatus + Utils.writeShortString(buffer, topic, ProducerResponse.DefaultCharset) + /* partition count */ + buffer.putInt(errorsAndOffsets.size) + errorsAndOffsets.foreach(partitionStatus => { + val (partitionId, error, offset) = (partitionStatus._1._2, partitionStatus._2._1, partitionStatus._2._2) + buffer.putInt(partitionId) + buffer.putShort(error) + buffer.putLong(offset) + }) + }) } } \ No newline at end of file diff --git core/src/main/scala/kafka/javaapi/ProducerRequest.scala core/src/main/scala/kafka/javaapi/ProducerRequest.scala index 77e07d7..2052209 100644 --- core/src/main/scala/kafka/javaapi/ProducerRequest.scala +++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala @@ -16,15 +16,16 @@ */ package kafka.javaapi -import kafka.api.RequestOrResponse -import kafka.api.{RequestKeys, TopicData} import java.nio.ByteBuffer +import kafka.api.{PartitionData, RequestOrResponse, RequestKeys} +import collection.immutable.SortedMap class ProducerRequest(val correlationId: Int, val clientId: String, val requiredAcks: Short, val ackTimeoutMs: Int, - val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) { + val data: SortedMap[(String, Int), PartitionData]) + extends RequestOrResponse(Some(RequestKeys.Produce)) { val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data) diff --git core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala index c1ff168..3c34164 100644 --- core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala +++ core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala @@ -18,7 +18,8 @@ package kafka.javaapi.producer import kafka.producer.SyncProducerConfig import kafka.javaapi.message.ByteBufferMessageSet -import kafka.api.{ProducerResponse, PartitionData, TopicData} +import kafka.api.{ProducerResponse, PartitionData} +import collection.immutable.SortedMap class SyncProducer(syncProducer: kafka.producer.SyncProducer) { @@ -31,8 +32,7 @@ class SyncProducer(syncProducer: kafka.producer.SyncProducer) { } def send(topic: String, messages: ByteBufferMessageSet): ProducerResponse = { - val partitionData = Array[PartitionData]( new PartitionData(-1, messages.underlying) ) - val data = Array[TopicData]( new TopicData(topic, partitionData) ) + val data = SortedMap((topic, -1) -> new PartitionData(-1, messages.underlying)) val producerRequest = new kafka.api.ProducerRequest(-1, "", 0, 0, data) underlying.send(producerRequest) } diff --git core/src/main/scala/kafka/producer/SyncProducer.scala core/src/main/scala/kafka/producer/SyncProducer.scala index f22e517..83aa6aa 100644 --- core/src/main/scala/kafka/producer/SyncProducer.scala +++ core/src/main/scala/kafka/producer/SyncProducer.scala @@ -101,13 +101,12 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { * Send a message */ def send(producerRequest: ProducerRequest): ProducerResponse = { - for( topicData <- producerRequest.data ) { - for( partitionData <- topicData.partitionDataArray ) { - verifyMessageSize(partitionData.messages) - val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int] - trace("Got message set with " + setSize + " bytes to send") - } - } + producerRequest.data.foreach( topicAndPartitionData => { + val partitionData = topicAndPartitionData._2 + verifyMessageSize(partitionData.messages) + val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int] + trace("Got message set with " + setSize + " bytes to send") + }) val response = doSend(producerRequest) ProducerResponse.readFrom(response.buffer) } diff --git core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 62efc94..4f16579 100644 --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -23,9 +23,10 @@ import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet} import kafka.producer._ import kafka.serializer.Encoder import kafka.utils.{Utils, Logging} -import scala.collection.Map +import scala.collection.{Map, Seq} import scala.collection.mutable.{ListBuffer, HashMap} -import kafka.api.{TopicMetadata, ProducerRequest, TopicData, PartitionData} +import kafka.api.{TopicMetadata, ProducerRequest, PartitionData} +import collection.immutable.SortedMap class DefaultEventHandler[K,V](config: ProducerConfig, @@ -173,30 +174,25 @@ class DefaultEventHandler[K,V](config: ProducerConfig, */ private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Seq[(String, Int)] = { if(brokerId < 0) { - warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic)) + warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic)) messagesPerTopic.keys.toSeq } else if(messagesPerTopic.size > 0) { - val topics = new HashMap[String, ListBuffer[PartitionData]]() - for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) { - val partitionData = topics.getOrElseUpdate(topicName, new ListBuffer[PartitionData]()) - partitionData.append(new PartitionData(partitionId, messagesSet)) - } - val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray + val topicPartitionDataPairs = messagesPerTopic.toSeq.map(topicAndMessages => { + val (topic, partitionId) = topicAndMessages._1 + ((topic, partitionId), new PartitionData(partitionId, topicAndMessages._2)) + }) val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, - config.requestTimeoutMs, topicData) + config.requestTimeoutMs, SortedMap(topicPartitionDataPairs:_*)) try { val syncProducer = producerPool.getProducer(brokerId) val response = syncProducer.send(producerRequest) - trace("producer sent messages for topics %s to broker %d on %s:%d" + trace("Producer sent messages for topics %s to broker %d on %s:%d" .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port)) - var msgIdx = -1 - val errors = new ListBuffer[(String, Int)] - for( topic <- topicData; partition <- topic.partitionDataArray ) { - msgIdx += 1 - if(msgIdx > response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError) - errors.append((topic.topic, partition.partition)) - } - errors + if (response.status.size != producerRequest.data.size) + throw new KafkaException("Incomplete response (%s) for producer request (%s)" + .format(response, producerRequest)) + response.status.filter(_._2._1 != ErrorMapping.NoError).toSeq + .map(partitionStatus => partitionStatus._1) } catch { case e => warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), e) diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala index 5d5c6d6..f8365b8 100644 --- core/src/main/scala/kafka/server/KafkaApis.scala +++ core/src/main/scala/kafka/server/KafkaApis.scala @@ -138,14 +138,14 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, } /** - * Check if the partitionDataArray from a produce request can unblock any + * Check if a partitionData from a produce request can unblock any * DelayedFetch requests. */ - def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) { - var satisfied = new mutable.ArrayBuffer[DelayedFetch] - for(partitionData <- partitionDatas) - satisfied ++= fetchRequestPurgatory.update(RequestKey(topic, partitionData.partition), null) - trace("Producer request to %s unblocked %d fetch requests.".format(topic, satisfied.size)) + def maybeUnblockDelayedFetchRequests(topic: String, partitionData: PartitionData) { + val partition = partitionData.partition + val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), null) + trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size)) + // send any newly unblocked responses for(fetchReq <- satisfied) { val topicData = readMessageSets(fetchReq.fetch) @@ -169,28 +169,23 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, requestLogger.trace("Handling producer request " + request.toString) trace("Handling producer request " + request.toString) - val response = produceToLocalLog(produceRequest) + val localProduceResponse = produceToLocalLog(produceRequest) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) - - for (topicData <- produceRequest.data) - maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray) - + + produceRequest.data.foreach(partitionAndData => + maybeUnblockDelayedFetchRequests(partitionAndData._1._1, partitionAndData._2)) + if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1 || produceRequest.data.size <= 0) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(localProduceResponse))) else { // create a list of (topic, partition) pairs to use as keys for this delayed request - val producerRequestKeys = produceRequest.data.flatMap(topicData => { - val topic = topicData.topic - topicData.partitionDataArray.map(partitionData => { - RequestKey(topic, partitionData.partition) - }) - }) + val producerRequestKeys = produceRequest.data.keys.map( + topicAndPartition => RequestKey(topicAndPartition._1, topicAndPartition._2)).toSeq val delayedProduce = new DelayedProduce( - producerRequestKeys, request, - response.errors, response.offsets, + producerRequestKeys, request, localProduceResponse, produceRequest, produceRequest.ackTimeoutMs.toLong) producerRequestPurgatory.watch(delayedProduce) @@ -214,42 +209,42 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, */ private def produceToLocalLog(request: ProducerRequest): ProducerResponse = { trace("Produce [%s] to local log ".format(request.toString)) - val requestSize = request.topicPartitionCount - val errors = new Array[Short](requestSize) - val offsets = new Array[Long](requestSize) - - var msgIndex = -1 - for(topicData <- request.data) { - for(partitionData <- topicData.partitionDataArray) { - msgIndex += 1 - BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes) - BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes) - try { - kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition) - val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition) - log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) - replicaManager.recordLeaderLogEndOffset(topicData.topic, partitionData.partition, log.logEndOffset) - offsets(msgIndex) = log.logEndOffset - errors(msgIndex) = ErrorMapping.NoError.toShort - trace("%d bytes written to logs, nextAppendOffset = %d" - .format(partitionData.messages.sizeInBytes, offsets(msgIndex))) - } catch { - case e => - BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest - BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest - error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e) - e match { - case _: IOException => - fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) - System.exit(1) - case _ => - errors(msgIndex) = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort - offsets(msgIndex) = -1 - } - } + + val localErrorsAndOffsets = request.data.map(topicData => { + val (topic, partitionData) = (topicData._1._1, topicData._2) + BrokerTopicStat.getBrokerTopicStat(topic).recordBytesIn(partitionData.messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes) + + val messagesSize = partitionData.messages.sizeInBytes + BrokerTopicStat.getBrokerTopicStat(topic).recordBytesIn(messagesSize) + BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(messagesSize) + try { + kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partitionData.partition) + val log = logManager.getOrCreateLog(topic, partitionData.partition) + log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) + replicaManager.recordLeaderLogEndOffset(topic, partitionData.partition, log.logEndOffset) + val (error, offset) = (ErrorMapping.NoError.toShort, log.logEndOffset) + trace("%d bytes written to logs, nextAppendOffset = %d" + .format(partitionData.messages.sizeInBytes, offset)) + ((topic, partitionData.partition), (error, offset)) + } catch { + case e => + BrokerTopicStat.getBrokerTopicStat(topic).recordFailedProduceRequest + BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest + error("Error processing ProducerRequest on %s:%d".format(topic, partitionData.partition), e) + e match { + case _: IOException => + fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) + // Compiler prefers scala.sys.exit to System.exit. + exit(1) + case _ => + val (error, offset) = (ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort, -1L) + ((topic, partitionData.partition), (error, offset)) + } } - } - new ProducerResponse(request.versionId, request.correlationId, errors, offsets) + }) + + new ProducerResponse(request.versionId, request.correlationId, localErrorsAndOffsets) } /** @@ -501,6 +496,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, private [kafka] case class RequestKey(topic: String, partition: Int) extends MetricKey { override def keyLabel = "%s-%d".format(topic, partition) + def topicPartition = (topic, partition) } /** * A delayed fetch request @@ -537,48 +533,44 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, class DelayedProduce(keys: Seq[RequestKey], request: RequestChannel.Request, - localErrors: Array[Short], - requiredOffsets: Array[Long], + localProduceResponse: ProducerResponse, val produce: ProducerRequest, delayMs: Long) extends DelayedRequest(keys, request, delayMs) with Logging { + private val initialErrorsAndOffsets = localProduceResponse.status /** * Map of (topic, partition) -> partition status * The values in this map don't need to be synchronized since updates to the * values are effectively synchronized by the ProducerRequestPurgatory's * update method */ - private [kafka] val partitionStatus = keys.map(key => { - val keyIndex = keys.indexOf(key) + private [kafka] val partitionStatus = keys.map(requestKey => { + val errorAndOffset = initialErrorsAndOffsets(requestKey.topicPartition) // if there was an error in writing to the local replica's log, then don't // wait for acks on this partition - val acksPending = - if (localErrors(keyIndex) == ErrorMapping.NoError) { + val (acksPending, error, offset) = + if (errorAndOffset._1 == ErrorMapping.NoError) { // Timeout error state will be cleared when requiredAcks are received - localErrors(keyIndex) = ErrorMapping.RequestTimedOutCode - true + (true, ErrorMapping.RequestTimedOutCode, errorAndOffset._2) } - else - false + else (false, errorAndOffset._1, errorAndOffset._2) - val initialStatus = new PartitionStatus(acksPending, localErrors(keyIndex), requiredOffsets(keyIndex)) - trace("Initial partition status for %s = %s".format(key, initialStatus)) - (key, initialStatus) + val initialStatus = new PartitionStatus(acksPending, error, offset) + trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus)) + (requestKey, initialStatus) }).toMap - def respond() { - val errorsAndOffsets: (List[Short], List[Long]) = ( - keys.foldRight - ((List[Short](), List[Long]())) - ((key: RequestKey, result: (List[Short], List[Long])) => { - val status = partitionStatus(key) - (status.error :: result._1, status.requiredOffset :: result._2) - }) - ) - val response = new ProducerResponse(produce.versionId, produce.correlationId, - errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray) + + val finalErrorsAndOffsets = initialErrorsAndOffsets.map( + status => { + val (topic, partition) = status._1 + val pstat = partitionStatus(RequestKey(topic, partition)) + (status._1, (pstat.error, pstat.requiredOffset)) + }) + + val response = new ProducerResponse(produce.versionId, produce.correlationId, finalErrorsAndOffsets) requestChannel.sendResponse(new RequestChannel.Response( request, new BoundedByteBufferSend(response))) @@ -630,15 +622,11 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, fetchPartitionStatus.acksPending = false fetchPartitionStatus.error = ErrorMapping.NoError - val topicData = - produce.data.find(_.topic == topic).get - val partitionData = - topicData.partitionDataArray.find(_.partition == partitionId).get - delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key, - durationNs, - partitionData.sizeInBytes) - maybeUnblockDelayedFetchRequests( - topic, Array(partitionData)) + val partitionData = produce.data((topic, partitionId)) + delayedRequestMetrics.recordDelayedProducerKeyCaughtUp( + key, durationNs, partitionData.sizeInBytes) + + maybeUnblockDelayedFetchRequests(topic, partitionData) } } else { diff --git core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index fdfceec..80432a1 100644 --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -34,6 +34,7 @@ import org.scalatest.junit.JUnit3Suite import scala.collection.Map import scala.collection.mutable.ListBuffer import kafka.utils._ +import collection.immutable.SortedMap class AsyncProducerTest extends JUnit3Suite { val props = createBrokerConfigs(1) @@ -409,9 +410,12 @@ class AsyncProducerTest extends JUnit3Suite { // On the third try for partition 0, let it succeed. val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), TestUtils.messagesToSet(msgs), 0) val response1 = - new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L)) + new ProducerResponse(ProducerRequest.CurrentVersion, 0, + SortedMap((("topic1", 0), (ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)), + (("topic1", 1), (ErrorMapping.NoError, 0L)))) val request2 = TestUtils.produceRequest(topic1, 0, TestUtils.messagesToSet(msgs)) - val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)) + val response2 = new ProducerResponse( + ProducerRequest.CurrentVersion, 0, SortedMap((("topic1", 0), (ErrorMapping.NoError, 0L)))) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1) diff --git core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 29ef0d6..e658d80 100644 --- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -28,7 +28,8 @@ import kafka.server.KafkaConfig import kafka.utils.{TestZKUtils, SystemTime, TestUtils} import org.junit.Test import org.scalatest.junit.JUnit3Suite -import kafka.api.TopicData +import collection.immutable.SortedMap +import kafka.api.PartitionData class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { private var messageBytes = new Array[Byte](2); @@ -85,11 +86,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack = SyncProducerConfig.DefaultRequiredAcks - val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]()) + val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, SortedMap[(String, Int), PartitionData]()) val producer = new SyncProducer(new SyncProducerConfig(props)) val response = producer.send(emptyRequest) - Assert.assertTrue(response.errorCode == ErrorMapping.NoError && response.errors.size == 0 && response.offsets.size == 0) + Assert.assertTrue(response.errorCode == ErrorMapping.NoError && response.status.size == 0) } @Test @@ -152,10 +153,12 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertNotNull(response) Assert.assertEquals(request.correlationId, response.correlationId) - Assert.assertEquals(response.errors.length, response.offsets.length) - Assert.assertEquals(3, response.errors.length) - response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, _)) - response.offsets.foreach(Assert.assertEquals(-1L, _)) + Assert.assertEquals(3, response.status.size) + response.status.foreach(partitionStatus => { + val (error, offset) = partitionStatus._2 + Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, error) + Assert.assertEquals(-1L, offset) + }) // #2 - test that we get correct offsets when partition is owned by broker CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1) @@ -166,18 +169,17 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val response2 = producer.send(request) Assert.assertNotNull(response2) Assert.assertEquals(request.correlationId, response2.correlationId) - Assert.assertEquals(response2.errors.length, response2.offsets.length) - Assert.assertEquals(3, response2.errors.length) + Assert.assertEquals(3, response2.status.size) // the first and last message should have been accepted by broker - Assert.assertEquals(0, response2.errors(0)) - Assert.assertEquals(0, response2.errors(2)) - Assert.assertEquals(messages.sizeInBytes, response2.offsets(0)) - Assert.assertEquals(messages.sizeInBytes, response2.offsets(2)) + Assert.assertEquals(0, response2.status("topic1", 0)._1) + Assert.assertEquals(0, response2.status("topic3", 0)._1) + Assert.assertEquals(messages.sizeInBytes, response2.status("topic1", 0)._2) + Assert.assertEquals(messages.sizeInBytes, response2.status("topic3", 0)._2) // the middle message should have been rejected because broker doesn't lead partition - Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, response2.errors(1)) - Assert.assertEquals(-1, response2.offsets(1)) + Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, response2.status("topic2", 0)._1) + Assert.assertEquals(-1, response2.status("topic2", 0)._2) } @Test @@ -232,8 +234,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertNotNull(response) Assert.assertEquals(request.correlationId, response.correlationId) - Assert.assertEquals(response.errors.length, response.offsets.length) - Assert.assertEquals(3, response.errors.length) - response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, _)) + Assert.assertEquals(3, response.status.size) + response.status.foreach(partitionStatus => + Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, partitionStatus._2._1)) } } diff --git core/src/test/scala/unit/kafka/utils/TestUtils.scala core/src/test/scala/unit/kafka/utils/TestUtils.scala index 53505a3..5ce7ada 100644 --- core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -37,6 +37,7 @@ import kafka.common.ErrorMapping import kafka.api._ import collection.mutable.{Map, Set} import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} +import collection.immutable.SortedMap /** @@ -366,8 +367,10 @@ object TestUtils extends Logging { val correlationId = SyncProducerConfig.DefaultCorrelationId val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs - val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray)) - new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, data.toArray) + val data = topics.flatMap(topic => + partitions.map(partition => ((topic, partition), new PartitionData(partition, message))) + ) + new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, SortedMap(data:_*)) } def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { @@ -382,10 +385,7 @@ object TestUtils extends Logging { val clientId = "test" val requiredAcks: Short = 0 val ackTimeoutMs = 0 - var data = new Array[TopicData](1) - var partitionData = new Array[PartitionData](1) - partitionData(0) = new PartitionData(partition,message.underlying) - data(0) = new TopicData(topic,partitionData) + val data = SortedMap(("topic", 0) -> new PartitionData(partition,message.underlying)) val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data) pr }