Index: core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala =================================================================== --- core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala (revision 1393767) +++ core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala (working copy) @@ -35,19 +35,32 @@ private val isr1 = List(0, 1, 2) private val leader2 = 0 private val isr2 = List(0, 2, 3) - private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes))) - private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes))) - private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes))) - private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes))) - private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2, partitionData3) + private val partitionDataFetchResponse0 = new PartitionDataFetchResponse(0, new ByteBufferMessageSet(new Message("first message".getBytes))) + private val partitionDataFetchResponse1 = new PartitionDataFetchResponse(1, new ByteBufferMessageSet(new Message("second message".getBytes))) + private val partitionDataFetchResponse2 = new PartitionDataFetchResponse(2, new ByteBufferMessageSet(new Message("third message".getBytes))) + private val partitionDataFetchResponse3 = new PartitionDataFetchResponse(3, new ByteBufferMessageSet(new Message("fourth message".getBytes))) + private val partitionDataFetchResponseArray = Array(partitionDataFetchResponse0, partitionDataFetchResponse1, partitionDataFetchResponse2, partitionDataFetchResponse3) - private val topicData = { + private val topicDataFetchResponse = { val groupedData = Array(topic1, topic2).flatMap(topic => - partitionDataArray.map(partitionData => + partitionDataFetchResponseArray.map(partitionData => (TopicAndPartition(topic, partitionData.partition), partitionData))) collection.immutable.Map(groupedData:_*) } + private val partitionDataProducerRequest0 = new PartitionDataProducerRequest(0, new ByteBufferMessageSet(new Message("first message".getBytes))) + private val partitionDataProducerRequest1 = new PartitionDataProducerRequest(1, new ByteBufferMessageSet(new Message("second message".getBytes))) + private val partitionDataProducerRequest2 = new PartitionDataProducerRequest(2, new ByteBufferMessageSet(new Message("third message".getBytes))) + private val partitionDataProducerRequest3 = new PartitionDataProducerRequest(3, new ByteBufferMessageSet(new Message("fourth message".getBytes))) + private val partitionDataProducerRequestArray = Array(partitionDataProducerRequest0, partitionDataProducerRequest1, partitionDataProducerRequest2, partitionDataProducerRequest3) + + private val topicDataProducerRequest = { + val groupedData = Array(topic1, topic2).flatMap(topic => + partitionDataProducerRequestArray.map(partitionData => + (TopicAndPartition(topic, partitionData.partition), partitionData))) + collection.immutable.Map(groupedData:_*) + } + private val requestInfos = collection.immutable.Map( TopicAndPartition(topic1, 0) -> PartitionFetchInfo(1000, 100), TopicAndPartition(topic1, 1) -> PartitionFetchInfo(2000, 100), @@ -92,7 +105,7 @@ } def createTestProducerRequest: ProducerRequest = { - new ProducerRequest(1, "client 1", 0, 1000, topicData) + new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest) } def createTestProducerResponse: ProducerResponse = @@ -106,7 +119,7 @@ } def createTestFetchResponse: FetchResponse = { - FetchResponse(1, 1, topicData) + FetchResponse(1, 1, topicDataFetchResponse) } def createTestOffsetRequest = new OffsetRequest( Index: core/src/test/scala/unit/kafka/network/SocketServerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/network/SocketServerTest.scala (revision 1393767) +++ core/src/test/scala/unit/kafka/network/SocketServerTest.scala (working copy) @@ -25,7 +25,7 @@ import java.util.Random import junit.framework.Assert._ import kafka.producer.SyncProducerConfig -import kafka.api.{PartitionData, ProducerRequest} +import kafka.api.{PartitionDataProducerRequest, ProducerRequest} import java.nio.ByteBuffer import kafka.common.TopicAndPartition @@ -78,7 +78,7 @@ val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack = SyncProducerConfig.DefaultRequiredAcks val emptyRequest = - new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, PartitionData]()) + new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, PartitionDataProducerRequest]()) val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes) emptyRequest.writeTo(byteBuffer) Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1393767) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -367,7 +367,7 @@ val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val data = topics.flatMap(topic => - partitions.map(partition => (TopicAndPartition(topic, partition), new PartitionData(partition, message))) + partitions.map(partition => (TopicAndPartition(topic, partition), new PartitionDataProducerRequest(partition, message))) ) new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*)) } Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (revision 1393767) +++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (working copy) @@ -27,7 +27,7 @@ import kafka.utils.{TestZKUtils, SystemTime, TestUtils} import org.junit.Test import org.scalatest.junit.JUnit3Suite -import kafka.api.{ProducerResponseStatus, PartitionData} +import kafka.api.{ProducerResponseStatus, PartitionDataProducerRequest} import kafka.common.{TopicAndPartition, ErrorMapping} class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @@ -85,7 +85,7 @@ val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack = SyncProducerConfig.DefaultRequiredAcks - val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, PartitionData]()) + val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, PartitionDataProducerRequest]()) val producer = new SyncProducer(new SyncProducerConfig(props)) val response = producer.send(emptyRequest) Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1393767) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -24,7 +24,7 @@ import kafka.utils.{Utils, Logging} import scala.collection.{Seq, Map} import scala.collection.mutable.{ListBuffer, HashMap} -import kafka.api.{TopicMetadata, ProducerRequest, PartitionData} +import kafka.api.{TopicMetadata, ProducerRequest, PartitionDataProducerRequest} class DefaultEventHandler[K,V](config: ProducerConfig, @@ -207,7 +207,7 @@ } else if(messagesPerTopic.size > 0) { val topicPartitionDataPairs = messagesPerTopic.toSeq.map { case (topicAndPartition, messages) => - (topicAndPartition, new PartitionData(topicAndPartition.partition, messages)) + (topicAndPartition, new PartitionDataProducerRequest(topicAndPartition.partition, messages)) } val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.requestTimeoutMs, Map(topicPartitionDataPairs:_*)) Index: core/src/main/scala/kafka/network/RequestChannel.scala =================================================================== --- core/src/main/scala/kafka/network/RequestChannel.scala (revision 1393767) +++ core/src/main/scala/kafka/network/RequestChannel.scala (working copy) @@ -30,7 +30,7 @@ val AllDone = new Request(1, 2, getShutdownReceive(), 0) def getShutdownReceive() = { - val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, PartitionData]()) + val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, PartitionDataProducerRequest]()) val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2) byteBuffer.putShort(RequestKeys.ProduceKey) emptyProducerRequest.writeTo(byteBuffer) Index: core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (revision 1393767) +++ core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (working copy) @@ -20,7 +20,7 @@ import kafka.cluster.Broker import kafka.server.AbstractFetcherThread import kafka.message.ByteBufferMessageSet -import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, PartitionData} +import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, PartitionDataFetchResponse} import kafka.common.TopicAndPartition @@ -34,7 +34,7 @@ minBytes = config.minFetchBytes) { // process fetched data - def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) { + def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionDataFetchResponse) { val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionData.partition)) if (pti.getFetchOffset != fetchOffset) throw new RuntimeException("offset doesn't match for topic %s partition: %d pti offset: %d fetch ofset: %d" Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1393767) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -96,7 +96,7 @@ * Check if a partitionData from a produce request can unblock any * DelayedFetch requests. */ - def maybeUnblockDelayedFetchRequests(topic: String, partitionData: PartitionData) { + def maybeUnblockDelayedFetchRequests(topic: String, partitionData: PartitionDataProducerRequest) { val partition = partitionData.partition val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), null) trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size)) @@ -285,7 +285,7 @@ /** * Read from all the offset details given and return a map of - * (topic, partition) -> PartitionData + * (topic, partition) -> PartitionDataFetchResponse */ private def readMessageSets(fetchRequest: FetchRequest) = { val isFetchFromFollower = fetchRequest.isFromFollower @@ -296,13 +296,13 @@ BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes) BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes) if (!isFetchFromFollower) { - new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages) + new PartitionDataFetchResponse(partition, ErrorMapping.NoError, offset, highWatermark, messages) } else { debug("Leader %d for topic %s partition %d received fetch request from follower %d" .format(brokerId, topic, partition, fetchRequest.replicaId)) debug("Leader %d returning %d messages for topic %s partition %d to follower %d" .format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId)) - new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages) + new PartitionDataFetchResponse(partition, ErrorMapping.NoError, offset, highWatermark, messages) } } catch { @@ -310,7 +310,7 @@ BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark() BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark() error("error when processing request " + (topic, partition, offset, fetchSize), t) - new PartitionData(partition, ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), + new PartitionDataFetchResponse(partition, ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), offset, -1L, MessageSet.Empty) } (TopicAndPartition(topic, partition), partitionData) Index: core/src/main/scala/kafka/server/AbstractFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/server/AbstractFetcherThread.scala (revision 1393767) +++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala (working copy) @@ -22,7 +22,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping} import collection.mutable import kafka.message.ByteBufferMessageSet -import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder} +import kafka.api.{FetchResponse, PartitionDataFetchResponse, FetchRequestBuilder} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import java.util.concurrent.atomic.AtomicLong @@ -44,7 +44,7 @@ // callbacks to be defined in subclass // process fetched data - def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) + def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionDataFetchResponse) // handle a partition whose offset is out of range and return a new fetch offset def handleOffsetOutOfRange(topic: String, partitionId: Int): Long Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (revision 1393767) +++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (working copy) @@ -17,7 +17,7 @@ package kafka.server -import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, PartitionData} +import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, PartitionDataFetchResponse} import kafka.cluster.Broker import kafka.message.ByteBufferMessageSet import kafka.common.TopicAndPartition @@ -30,7 +30,7 @@ minBytes = brokerConfig.replicaMinBytes) { // process fetched data - def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) { + def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionDataFetchResponse) { val partitionId = partitionData.partition val replica = replicaMgr.getReplica(topic, partitionId).get val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet] Index: core/src/main/scala/kafka/api/FetchResponse.scala =================================================================== --- core/src/main/scala/kafka/api/FetchResponse.scala (revision 1393767) +++ core/src/main/scala/kafka/api/FetchResponse.scala (working copy) @@ -24,8 +24,8 @@ import kafka.network.{MultiSend, Send} import kafka.utils.Utils -object PartitionData { - def readFrom(buffer: ByteBuffer): PartitionData = { +object PartitionDataFetchResponse extends { + def readFrom(buffer: ByteBuffer): PartitionDataFetchResponse = { val partition = buffer.getInt val error = buffer.getShort val initialOffset = buffer.getLong @@ -34,7 +34,7 @@ val messageSetBuffer = buffer.slice() messageSetBuffer.limit(messageSetSize) buffer.position(buffer.position + messageSetSize) - new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset)) + new PartitionDataFetchResponse(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset)) } val headerSize = @@ -45,20 +45,20 @@ 4 /* messageSetSize */ } -case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) { +case class PartitionDataFetchResponse(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) { - val sizeInBytes = PartitionData.headerSize + messages.sizeInBytes.intValue() + val sizeInBytes = PartitionDataFetchResponse.headerSize + messages.sizeInBytes.intValue() def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages) } // SENDS -class PartitionDataSend(val partitionData: PartitionData) extends Send { +class PartitionDataSend(val partitionData: PartitionDataFetchResponse) extends Send { private val messageSize = partitionData.messages.sizeInBytes private var messagesSentSize = 0L - private val buffer = ByteBuffer.allocate(PartitionData.headerSize) + private val buffer = ByteBuffer.allocate(PartitionDataFetchResponse.headerSize) buffer.putInt(partitionData.partition) buffer.putShort(partitionData.error) buffer.putLong(partitionData.initialOffset) @@ -86,7 +86,7 @@ val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset) val partitionCount = buffer.getInt val topicPartitionDataPairs = (1 to partitionCount).map(_ => { - val partitionData = PartitionData.readFrom(buffer) + val partitionData = PartitionDataFetchResponse.readFrom(buffer) (TopicAndPartition(topic, partitionData.partition), partitionData) }) TopicData(topic, Map(topicPartitionDataPairs:_*)) @@ -97,7 +97,7 @@ 4 /* partition count */ } -case class TopicData(topic: String, partitionData: Map[TopicAndPartition, PartitionData]) { +case class TopicData(topic: String, partitionData: Map[TopicAndPartition, PartitionDataFetchResponse]) { val sizeInBytes = TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + _.sizeInBytes) @@ -158,7 +158,7 @@ case class FetchResponse(versionId: Short, correlationId: Int, - data: Map[TopicAndPartition, PartitionData]) { + data: Map[TopicAndPartition, PartitionDataFetchResponse]) { /** * Partitions the data into a map of maps (one for each topic). @@ -173,7 +173,7 @@ topicData.sizeInBytes }) - private def partitionDataFor(topic: String, partition: Int): PartitionData = { + private def partitionDataFor(topic: String, partition: Int): PartitionDataFetchResponse = { val topicAndPartition = TopicAndPartition(topic, partition) data.get(topicAndPartition) match { case Some(partitionData) => partitionData Index: core/src/main/scala/kafka/api/ProducerRequest.scala =================================================================== --- core/src/main/scala/kafka/api/ProducerRequest.scala (revision 1393767) +++ core/src/main/scala/kafka/api/ProducerRequest.scala (working copy) @@ -23,7 +23,26 @@ import scala.collection.Map import kafka.common.TopicAndPartition +object PartitionDataProducerRequest extends { + def readFrom(buffer: ByteBuffer): PartitionDataProducerRequest = { + val partition = buffer.getInt + val messageSetSize = buffer.getInt + val messageSetBuffer = buffer.slice() + messageSetBuffer.limit(messageSetSize) + buffer.position(buffer.position + messageSetSize) + new PartitionDataProducerRequest(partition, new ByteBufferMessageSet(messageSetBuffer)) + } + val headerSize = + 4 + /* partition */ + 4 /* messageSetSize */ +} + +case class PartitionDataProducerRequest(partition: Int, messages: MessageSet) { + + val sizeInBytes = PartitionDataProducerRequest.headerSize + messages.sizeInBytes.intValue() +} + object ProducerRequest { val CurrentVersion: Short = 0 @@ -45,7 +64,7 @@ val messageSetBuffer = new Array[Byte](messageSetSize) buffer.get(messageSetBuffer,0,messageSetSize) (TopicAndPartition(topic, partition), - new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))) + new PartitionDataProducerRequest(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))) }) }) @@ -58,7 +77,7 @@ clientId: String, requiredAcks: Short, ackTimeoutMs: Int, - data: Map[TopicAndPartition, PartitionData]) + data: Map[TopicAndPartition, PartitionDataProducerRequest]) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { /** @@ -70,7 +89,7 @@ clientId: String, requiredAcks: Short, ackTimeoutMs: Int, - data: Map[TopicAndPartition, PartitionData]) = + data: Map[TopicAndPartition, PartitionDataProducerRequest]) = this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data) def writeTo(buffer: ByteBuffer) {