Index: core/src/main/scala/kafka/producer/SyncProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducerConfig.scala (revision 1294801) +++ core/src/main/scala/kafka/producer/SyncProducerConfig.scala (working copy) @@ -41,4 +41,23 @@ val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000) val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000) + + /* the client application sending the producer requests */ + val correlation_id = Utils.getInt(props,"producer.request.correlation_id", SyncProducerConfig.DefaultCorrelationId) + + /* the client application sending the producer requests */ + val client_id = Utils.getString(props,"producer.request.client_id", SyncProducerConfig.DefaultClientId) + + /* the required_acks of the producer requests */ + val required_acks = Utils.getShort(props,"producer.request.required_acks", SyncProducerConfig.DefaultRequiredAcks) + + /* the ack_timeout of the producer requests */ + val ack_timeout = Utils.getInt(props,"producer.request.ack_timeout.ms", SyncProducerConfig.DefaultAckTimeoutMs) } + +object SyncProducerConfig { + val DefaultCorrelationId = -1 + val DefaultClientId = "" + val DefaultRequiredAcks : Short = 0 + val DefaultAckTimeoutMs = 1 +} \ No newline at end of file Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1294801) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -32,6 +32,7 @@ import collection.mutable.ListBuffer import kafka.consumer.{KafkaMessageStream, ConsumerConfig} import scala.collection.Map +import kafka.api.{ProducerRequest, TopicData, PartitionData} /** * Utility functions to help with testing @@ -329,7 +330,47 @@ buffer += ("msg" + i) buffer } + /** + * Create a wired format request based on simple basic information + */ + def produceRequest(topic: String, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { + produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,ProducerRequest.RandomPartition,message) + } + def produceRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { + produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message) + } + def produceRequest(correlation_id: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { + val client_id = SyncProducerConfig.DefaultClientId + val required_acks: Short = SyncProducerConfig.DefaultRequiredAcks + val ack_timeout = SyncProducerConfig.DefaultAckTimeoutMs + var data = new Array[TopicData](1) + var partition_data = new Array[PartitionData](1) + partition_data(0) = new PartitionData(partition,message) + data(0) = new TopicData(topic,partition_data) + val pr = new kafka.api.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) + pr + } + + def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { + produceJavaRequest(-1,topic,-1,message) + } + + def produceJavaRequest(topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { + produceJavaRequest(-1,topic,partition,message) + } + + def produceJavaRequest(correlation_id: Int, topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[TopicData](1) + var partition_data = new Array[PartitionData](1) + partition_data(0) = new PartitionData(partition,message.underlying) + data(0) = new TopicData(topic,partition_data) + val pr = new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) + pr + } } object TestZKUtils { Index: core/src/main/scala/kafka/api/ProducerRequest.scala =================================================================== --- core/src/main/scala/kafka/api/ProducerRequest.scala (revision 1294801) +++ core/src/main/scala/kafka/api/ProducerRequest.scala (working copy) @@ -24,45 +24,95 @@ object ProducerRequest { val RandomPartition = -1 - + val version_id: Short = 0 + def readFrom(buffer: ByteBuffer): ProducerRequest = { - val topic = Utils.readShortString(buffer, "UTF-8") - val partition = buffer.getInt - val messageSetSize = buffer.getInt - val messageSetBuffer = buffer.slice() - messageSetBuffer.limit(messageSetSize) - buffer.position(buffer.position + messageSetSize) - new ProducerRequest(topic, partition, new ByteBufferMessageSet(messageSetBuffer)) + val version_id: Short = buffer.getShort + val correlation_id: Int = buffer.getInt + val client_id: String = Utils.readShortString(buffer, "UTF-8") + val required_acks: Short = buffer.getShort + val ack_timeout: Int = buffer.getInt + //build the topic structure + val topicCount = buffer.getInt + val data = new Array[TopicData](topicCount) + for(i <- 0 until topicCount) { + val topic = Utils.readShortString(buffer, "UTF-8") + + val partitionCount = buffer.getInt + //build the partition structure within this topic + val partitionData = new Array[PartitionData](partitionCount) + for (j <- 0 until partitionCount) { + val partition = buffer.getInt + val messageSetSize = buffer.getInt + val messageSetBuffer = new Array[Byte](messageSetSize) + buffer.get(messageSetBuffer,0,messageSetSize) + partitionData(j) = new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) + } + data(i) = new TopicData(topic,partitionData) + } + new ProducerRequest(correlation_id,client_id,required_acks,ack_timeout,data) } } -class ProducerRequest(val topic: String, - val partition: Int, - val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) { +case class ProducerRequest(val correlation_id: Int, + val client_id: String, + val required_acks: Short, + val ack_timeout: Int, + val data: Array[TopicData]) extends Request(RequestKeys.Produce) { + val version_id: Short = ProducerRequest.version_id + def writeTo(buffer: ByteBuffer) { - Utils.writeShortString(buffer, topic) - buffer.putInt(partition) - buffer.putInt(messages.serialized.limit) - buffer.put(messages.serialized) - messages.serialized.rewind + buffer.putShort(version_id) + buffer.putInt(correlation_id) + Utils.writeShortString(buffer, client_id, "UTF-8") + buffer.putShort(required_acks) + buffer.putInt(ack_timeout) + //save the topic structure + buffer.putInt(data.size) //the number of topics + data.foreach(d =>{ + Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic + buffer.putInt(d.partitionData.size) //the number of partitions + d.partitionData.foreach(p => { + buffer.putInt(p.partition) + buffer.putInt(p.messages.getSerialized().limit) + buffer.put(p.messages.getSerialized()) + p.messages.getSerialized().rewind + }) + }) } - - def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + messages.sizeInBytes.asInstanceOf[Int] - def getTranslatedPartition(randomSelector: String => Int): Int = { - if (partition == ProducerRequest.RandomPartition) - return randomSelector(topic) - else - return partition + def sizeInBytes(): Int = { + var size = 0 + //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size + size = 2 + 4 + 2 + client_id.length + 2 + 4 + 4; + data.foreach(d =>{ + size += 2 + d.topic.length + 4 + d.partitionData.foreach(p => { + size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int] + }) + }) + size } override def toString: String = { val builder = new StringBuilder() builder.append("ProducerRequest(") - builder.append(topic + ",") - builder.append(partition + ",") - builder.append(messages.sizeInBytes) + builder.append(version_id + ",") + builder.append(correlation_id + ",") + builder.append(client_id + ",") + builder.append(required_acks + ",") + builder.append(ack_timeout) + data.foreach(d =>{ + builder.append(":[" + d.topic) + d.partitionData.foreach(p => { + builder.append(":[") + builder.append(p.partition + ",") + builder.append(p.messages.sizeInBytes) + builder.append("]") + }) + builder.append("]") + }) builder.append(")") builder.toString } @@ -70,14 +120,12 @@ override def equals(other: Any): Boolean = { other match { case that: ProducerRequest => - (that canEqual this) && topic == that.topic && partition == that.partition && - messages.equals(that.messages) + ( correlation_id == that.correlation_id && + client_id == that.client_id && + required_acks == that.required_acks && + ack_timeout == that.ack_timeout && + data.toSeq == that.data.toSeq ) case _ => false } } - - def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] - - override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode - -} +} \ No newline at end of file Index: core/src/main/scala/kafka/api/FetchResponse.scala =================================================================== --- core/src/main/scala/kafka/api/FetchResponse.scala (revision 1294801) +++ core/src/main/scala/kafka/api/FetchResponse.scala (working copy) @@ -39,6 +39,15 @@ case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) { val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() + + def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages) + + def getTranslatedPartition(topic: String, randomSelector: String => Int): Int = { + if (partition == ProducerRequest.RandomPartition) + return randomSelector(topic) + else + return partition + } } object TopicData { @@ -73,6 +82,15 @@ case class TopicData(topic: String, partitionData: Array[PartitionData]) { val sizeInBytes = 2 + topic.length + partitionData.foldLeft(4)(_ + _.sizeInBytes) + + override def equals(other: Any): Boolean = { + other match { + case that: TopicData => + ( topic == that.topic && + partitionData.toSeq == that.partitionData.toSeq ) + case _ => false + } + } } object FetchResponse {