Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala =================================================================== --- core/src/test/scala/unit/kafka/utils/TestUtils.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/utils/TestUtils.scala (working copy) @@ -32,6 +32,7 @@ import collection.mutable.ListBuffer import kafka.consumer.{KafkaMessageStream, ConsumerConfig} import scala.collection.Map +import kafka.api.{TopicData, PartitionData} /** * Utility functions to help with testing @@ -329,7 +330,47 @@ buffer += ("msg" + i) buffer } + /** + * Create a wired format request based on simple basic information + */ + def produceRequest(topic: String, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { + produceRequest(-1,topic,-1,message) + } + def produceRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { + produceRequest(-1,topic,partition,message) + } + def produceRequest(correlation_id: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = { + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[TopicData](1) + var partition_data = new Array[PartitionData](1) + partition_data(0) = new PartitionData(partition,message) + data(0) = new TopicData(topic,partition_data) + val pr = new kafka.api.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) + pr + } + + def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { + produceJavaRequest(-1,topic,-1,message) + } + + def produceJavaRequest(topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { + produceJavaRequest(-1,topic,partition,message) + } + + def produceJavaRequest(correlation_id: Int, topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = { + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[TopicData](1) + var partition_data = new Array[PartitionData](1) + partition_data(0) = new PartitionData(partition,message.underlying) + data(0) = new TopicData(topic,partition_data) + val pr = new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) + pr + } } object TestZKUtils { Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/FetcherTest.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala (working copy) @@ -83,7 +83,7 @@ val ms = 0.until(messagesPerNode).map(x => new Message((conf.brokerId * 5 + x).toString.getBytes)).toArray val mSet = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = ms: _*) messages += conf.brokerId -> mSet - producer.send(topic, mSet) + producer.send(TestUtils.produceRequest(topic, mSet)) producer.close() count += ms.size } Index: core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (working copy) @@ -61,7 +61,7 @@ val producer = TestUtils.createProducer("localhost", brokerPort) for(i <- 0 until numMessages) { - producer.send(topic, TestUtils.singleMessageSet("test".getBytes())) + producer.send(TestUtils.produceRequest(topic, TestUtils.singleMessageSet("test".getBytes()))) } // update offset in zookeeper for consumer to jump "forward" in time @@ -115,7 +115,7 @@ val producer = TestUtils.createProducer("localhost", brokerPort) for(i <- 0 until numMessages) { - producer.send(topic, TestUtils.singleMessageSet("test".getBytes())) + producer.send(TestUtils.produceRequest(topic, TestUtils.singleMessageSet("test".getBytes()))) } // update offset in zookeeper for consumer to jump "forward" in time @@ -168,7 +168,7 @@ val producer = TestUtils.createProducer("localhost", brokerPort) for(i <- 0 until numMessages) { - producer.send(topic, TestUtils.singleMessageSet("test".getBytes())) + producer.send(TestUtils.produceRequest(topic, TestUtils.singleMessageSet("test".getBytes()))) } // update offset in zookeeper for consumer to jump "forward" in time Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (working copy) @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import java.util.Properties import junit.framework.Assert._ -import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder, ProducerRequest} +import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder, ProducerRequest, TopicData, PartitionData} import kafka.common.{FetchRequestFormatException, OffsetOutOfRangeException, InvalidPartitionException} import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet} import kafka.producer.{ProducerData, Producer, ProducerConfig} @@ -139,7 +139,7 @@ val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set - producer.send(topic, set) + producer.send(TestUtils.produceRequest(topic, set)) set.getBuffer.rewind builder.addFetch(topic, partition, 0, 10000) } @@ -205,7 +205,7 @@ val set = new ByteBufferMessageSet(DefaultCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set - producer.send(topic, set) + producer.send(TestUtils.produceRequest(topic, set)) set.getBuffer.rewind builder.addFetch(topic, partition, 0, 10000) } @@ -266,15 +266,23 @@ val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, ByteBufferMessageSet] val builder = new FetchRequestBuilder() - var produceList: List[ProducerRequest] = Nil + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[TopicData](topics.size) + var index = 0 for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set - produceList ::= new ProducerRequest(topic, 0, set) + var partition_data = new Array[PartitionData](1) + partition_data(0) = new PartitionData(0,set) + data(index) = new TopicData(topic,partition_data) + index += 1 builder.addFetch(topic, partition, 0, 10000) } - producer.multiSend(produceList.toArray) + val producerRequest = new ProducerRequest(-1, client_id, required_acks, ack_timeout, data) + producer.send(producerRequest) for (messageSet <- messages.values) messageSet.getBuffer.rewind @@ -294,15 +302,23 @@ val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, ByteBufferMessageSet] val builder = new FetchRequestBuilder() - var produceList: List[ProducerRequest] = Nil + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[TopicData](topics.size) + var index = 0 for( (topic, partition) <- topics) { - val set = new ByteBufferMessageSet(DefaultCompressionCodec, + val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set - produceList ::= new ProducerRequest(topic, 0, set) + var partition_data = new Array[PartitionData](1) + partition_data(0) = new PartitionData(0,set) + data(index) = new TopicData(topic,partition_data) + index += 1 builder.addFetch(topic, partition, 0, 10000) } - producer.multiSend(produceList.toArray) + val producerRequest = new ProducerRequest(-1, client_id, required_acks, ack_timeout, data) + producer.send(producerRequest) for (messageSet <- messages.values) messageSet.getBuffer.rewind Index: core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (working copy) @@ -48,7 +48,7 @@ // send some messages val sent1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("hello".getBytes())) - producer.send(topic, sent1) + producer.send(TestUtils.produceRequest(topic, sent1)) Thread.sleep(200) // corrupt the file on disk Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (working copy) @@ -17,7 +17,7 @@ package kafka.integration -import kafka.api.{FetchRequestBuilder, ProducerRequest} +import kafka.api.{FetchRequestBuilder, ProducerRequest, PartitionData, TopicData} import kafka.common.OffsetOutOfRangeException import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig} @@ -63,7 +63,7 @@ val topic = "test" val sent = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes())) - producer.send(topic, sent) + producer.send(TestUtils.produceRequest(topic, sent)) sent.getBuffer.rewind var fetchedMessage: ByteBufferMessageSet = null @@ -92,7 +92,7 @@ for( (topic, offset) <- topicOffsets) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) - producer.send(topic, set) + producer.send(TestUtils.produceRequest(topic, set)) set.getBuffer.rewind messages += topic -> set builder.addFetch(topic, offset, 0, 10000) @@ -132,15 +132,23 @@ val topics = List("test1", "test2", "test3"); val messages = new mutable.HashMap[String, ByteBufferMessageSet] val builder = new FetchRequestBuilder() - var produceList: List[ProducerRequest] = Nil + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[TopicData](topics.size) + var index = 0 for(topic <- topics) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set - produceList ::= new ProducerRequest(topic, 0, set) + var partition_data = new Array[PartitionData](1) + partition_data(0) = new PartitionData(0,set) + data(index) = new TopicData(topic,partition_data) + index += 1 builder.addFetch(topic, 0, 0, 10000) } - producer.multiSend(produceList.toArray) + val producerRequest = new ProducerRequest(-1, client_id, required_acks, ack_timeout, data) + producer.send(producerRequest) for (messageSet <- messages.values) messageSet.getBuffer.rewind @@ -160,18 +168,26 @@ val topics = List("test1", "test2", "test3"); val messages = new mutable.HashMap[String, ByteBufferMessageSet] val builder = new FetchRequestBuilder() - var produceList: List[ProducerRequest] = Nil + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[TopicData](topics.size) + var index = 0 for(topic <- topics) { val set = new ByteBufferMessageSet(NoCompressionCodec, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes)) messages += topic -> set - produceList ::= new ProducerRequest(topic, 0, set) + var partition_data = new Array[PartitionData](1) + partition_data(0) = new PartitionData(0,set) + data(index) = new TopicData(topic,partition_data) + index += 1 builder.addFetch(topic, 0, 0, 10000) } - producer.multiSend(produceList.toArray) + val producerRequest = new ProducerRequest(-1, client_id, required_acks, ack_timeout, data) + producer.send(producerRequest) // resend the same multisend - producer.multiSend(produceList.toArray) + producer.send(producerRequest) for (messageSet <- messages.values) messageSet.getBuffer.rewind Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (working copy) @@ -44,7 +44,7 @@ var failed = false val firstStart = SystemTime.milliseconds try { - producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))) + producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) }catch { case e: Exception => failed=true } @@ -54,7 +54,7 @@ Assert.assertTrue((firstEnd-firstStart) < 500) val secondStart = SystemTime.milliseconds try { - producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))) + producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) }catch { case e: Exception => failed = true } @@ -63,7 +63,7 @@ Assert.assertTrue((secondEnd-secondStart) < 500) try { - producer.multiSend(Array(new ProducerRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes))))) + producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) }catch { case e: Exception => failed=true } @@ -83,7 +83,7 @@ val bytes = new Array[Byte](101) var failed = false try { - producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(bytes))) + producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(bytes)))) }catch { case e: MessageSizeTooLargeException => failed = true } Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (working copy) @@ -311,11 +311,11 @@ val topic = "topic1" val msgs = TestUtils.getMsgStrings(10) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) - mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, ProducerRequest.RandomPartition, - messagesToSet(msgs.take(5)))))) + mockSyncProducer.send(TestUtils.produceRequest(topic, ProducerRequest.RandomPartition, + messagesToSet(msgs.take(5)))) EasyMock.expectLastCall - mockSyncProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic, ProducerRequest.RandomPartition, - messagesToSet(msgs.takeRight(5)))))) + mockSyncProducer.send(TestUtils.produceRequest(topic, ProducerRequest.RandomPartition, + messagesToSet(msgs.takeRight(5)))) EasyMock.expectLastCall mockSyncProducer.close EasyMock.expectLastCall @@ -399,11 +399,8 @@ } class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config) { - override def send(topic: String, messages: ByteBufferMessageSet): Unit = { + override def send(produceRequest: ProducerRequest): Unit = { Thread.sleep(1000) } - override def multiSend(produces: Array[ProducerRequest]) { - Thread.sleep(1000) - } } } Index: core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala =================================================================== --- core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (working copy) @@ -33,7 +33,7 @@ // create a ByteBufferMessageSet that doesn't contain a full message // iterating it should get an InvalidMessageSizeException val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes())) - val buffer = messages.serialized.slice + val buffer = messages.getSerialized().slice buffer.limit(10) val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000) try { @@ -51,7 +51,7 @@ { val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes())) val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2) - buffer.put(messages.serialized) + buffer.put(messages.getSerialized()) buffer.putShort(4) val messagesPlus = new ByteBufferMessageSet(buffer) assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes) @@ -93,7 +93,7 @@ //make sure ByteBufferMessageSet is re-iterable. TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) //make sure the last offset after iteration is correct - assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit) + assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit) } // test for compressed regular messages @@ -103,7 +103,7 @@ //make sure ByteBufferMessageSet is re-iterable. TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator)) //make sure the last offset after iteration is correct - assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit) + assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit) } // test for mixed empty and non-empty messagesets uncompressed @@ -111,16 +111,16 @@ val emptyMessageList : List[Message] = Nil val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*) val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*) - val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit) - buffer.put(emptyMessageSet.serialized) - buffer.put(regularMessgeSet.serialized) + val buffer = ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + regularMessgeSet.getSerialized().limit) + buffer.put(emptyMessageSet.getSerialized()) + buffer.put(regularMessgeSet.getSerialized()) buffer.rewind val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0) TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) //make sure ByteBufferMessageSet is re-iterable. TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) //make sure the last offset after iteration is correct - assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) + assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit) } // test for mixed empty and non-empty messagesets compressed @@ -128,16 +128,16 @@ val emptyMessageList : List[Message] = Nil val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*) val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*) - val buffer = ByteBuffer.allocate(emptyMessageSet.serialized.limit + regularMessgeSet.serialized.limit) - buffer.put(emptyMessageSet.serialized) - buffer.put(regularMessgeSet.serialized) + val buffer = ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + regularMessgeSet.getSerialized().limit) + buffer.put(emptyMessageSet.getSerialized()) + buffer.put(regularMessgeSet.getSerialized()) buffer.rewind val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0) TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) //make sure ByteBufferMessageSet is re-iterable. TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) //make sure the last offset after iteration is correct - assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) + assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit) } } Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala =================================================================== --- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (working copy) @@ -56,7 +56,7 @@ server.startup() // send some messages - producer.send(topic, sent1) + producer.send(TestUtils.produceRequest(topic, sent1)) sent1.getBuffer.rewind Thread.sleep(200) @@ -91,7 +91,7 @@ val newOffset = fetchedMessage.validBytes // send some more messages - producer.send(topic, sent2) + producer.send(TestUtils.produceRequest(topic, sent2)) sent2.getBuffer.rewind Thread.sleep(200) Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -254,7 +254,7 @@ val mSet = new ByteBufferMessageSet(compressionCodec = compression, messages = ms: _*) for (message <- ms) messages ::= message - producer.send(topic, partition, mSet) + producer.send(TestUtils.produceRequest(topic, partition, mSet)) } producer.close() messages Index: core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala (working copy) @@ -27,6 +27,7 @@ import kafka.utils.TestUtils import org.apache.log4j.{Level, Logger} import org.scalatest.junit.JUnit3Suite +import kafka.api.{TopicData, PartitionData} /** * End to end tests of the primitive apis against a local server @@ -346,16 +347,24 @@ val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, ByteBufferMessageSet] val builder = new FetchRequestBuilder() - var produceList: List[ProducerRequest] = Nil + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[TopicData](topics.size) + var index = 0 for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set - produceList ::= new ProducerRequest(topic, 0, set) + var partition_data = new Array[PartitionData](1) + partition_data(0) = new PartitionData(0,set.underlying) + data(index) = new TopicData(topic,partition_data) + index += 1 builder.addFetch(topic, partition, 0, 10000) } - producer.multiSend(produceList.toArray) + val producerRequest = new kafka.javaapi.ProducerRequest(-1, client_id, required_acks, ack_timeout, data) + producer.send(producerRequest) for (messageSet <- messages.values) messageSet.getBuffer.rewind @@ -379,16 +388,24 @@ val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); val messages = new mutable.HashMap[String, ByteBufferMessageSet] val builder = new FetchRequestBuilder() - var produceList: List[ProducerRequest] = Nil + val client_id = "test" + val required_acks: Short = 0 + val ack_timeout = 0 + var data = new Array[TopicData](topics.size) + var index = 0 for( (topic, partition) <- topics) { val set = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = getMessageList(new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))) messages += topic -> set - produceList ::= new ProducerRequest(topic, 0, set) + var partition_data = new Array[PartitionData](1) + partition_data(0) = new PartitionData(0,set.underlying) + data(index) = new TopicData(topic,partition_data) + index += 1 builder.addFetch(topic, partition, 0, 10000) } - producer.multiSend(produceList.toArray) + val producerRequest = new ProducerRequest(-1, client_id, required_acks, ack_timeout, data) + producer.send(producerRequest) for (messageSet <- messages.values) messageSet.getBuffer.rewind @@ -412,4 +429,5 @@ messages.foreach(m => messageList.add(m)) messageList } -} + +} \ No newline at end of file Index: core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (revision 1245648) +++ core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (working copy) @@ -74,7 +74,7 @@ val mSet = new ByteBufferMessageSet(compressionCodec = compressed, messages = getMessageList(ms: _*)) for (message <- ms) messages ::= message - producer.send(topic, partition, mSet) + producer.send(TestUtils.produceJavaRequest(topic, partition, mSet)) } producer.close() messages Index: core/src/main/scala/kafka/producer/SyncProducerConfig.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducerConfig.scala (revision 1245648) +++ core/src/main/scala/kafka/producer/SyncProducerConfig.scala (working copy) @@ -41,4 +41,16 @@ val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000) val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000) -} + + /* the client application sending the producer requests */ + val correlation_id = Utils.getInt(props,"producer.request.correlation_id",-1) + + /* the client application sending the producer requests */ + val client_id = Utils.getString(props,"producer.request.client_id","") + + /* the required_acks of the producer requests */ + val required_acks = Utils.getShort(props,"producer.request.required_acks",0) + + /* the ack_timeout of the producer requests */ + val ack_timeout = Utils.getInt(props,"producer.request.ack_timeout",1) +} \ No newline at end of file Index: core/src/main/scala/kafka/producer/SyncProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1245648) +++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy) @@ -50,29 +50,10 @@ if (logger.isTraceEnabled) { trace("verifying sendbuffer of size " + buffer.limit) val requestTypeId = buffer.getShort() - if (requestTypeId == RequestKeys.MultiProduce) { - try { - val request = MultiProducerRequest.readFrom(buffer) - for (produce <- request.produces) { - try { - for (messageAndOffset <- produce.messages) - if (!messageAndOffset.message.isValid) - trace("topic " + produce.topic + " is invalid") - } - catch { - case e: Throwable => - trace("error iterating messages ", e) - } - } - } - catch { - case e: Throwable => - trace("error verifying sendbuffer ", e) - } - } + val request = ProducerRequest.readFrom(buffer) + trace(request.toString) } } - /** * Common functionality for the public send methods */ @@ -107,23 +88,17 @@ /** * Send a message */ - def send(topic: String, partition: Int, messages: ByteBufferMessageSet) { - verifyMessageSize(messages) - val setSize = messages.sizeInBytes.asInstanceOf[Int] - trace("Got message set with " + setSize + " bytes to send") - send(new BoundedByteBufferSend(new ProducerRequest(topic, partition, messages))) + def send(producerRequest: ProducerRequest) { + producerRequest.data.foreach(d => { + d.partitionData.foreach(p => { + verifyMessageSize(new ByteBufferMessageSet(p.messages.getSerialized())) + val setSize = p.messages.sizeInBytes.asInstanceOf[Int] + trace("Got message set with " + setSize + " bytes to send") + }) + }) + send(new BoundedByteBufferSend(producerRequest)) } - - def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, ProducerRequest.RandomPartition, messages) - def multiSend(produces: Array[ProducerRequest]) { - for (request <- produces) - verifyMessageSize(request.messages) - val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes) - trace("Got multi message sets with " + setSize + " bytes to send") - send(new BoundedByteBufferSend(new MultiProducerRequest(produces))) - } - def close() = { lock synchronized { disconnect() Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1245648) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -17,7 +17,7 @@ package kafka.producer.async -import kafka.api.ProducerRequest +import kafka.api.{ProducerRequest, TopicData, PartitionData} import kafka.serializer.Encoder import java.util.Properties import kafka.producer._ @@ -177,9 +177,22 @@ private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]) { if(messagesPerTopic.size > 0) { - val requests = messagesPerTopic.map(f => new ProducerRequest(f._1._1, f._1._2, f._2)).toArray + val topics = new HashMap[String, ListBuffer[PartitionData]]() + val requests = messagesPerTopic.map(f => { + val topicName = f._1._1 + val partitionId = f._1._2 + val messagesSet= f._2 + val topic = topics.get(topicName) // checking to see if this topics exists + topic match { + case None => topics += topicName -> new ListBuffer[PartitionData]() //create a new listbuffer for this topic + case Some(x) => trace("found " + topicName) + } + topics(topicName).append(new PartitionData(partitionId, messagesSet)) + }) + val topic_data = topics.map(kv => new TopicData(kv._1,kv._2.toArray)) + val producerRequest = new ProducerRequest(config.correlation_id, config.client_id, config.required_acks, config.ack_timeout, topic_data.toArray) //new kafka.javaapi.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, topic_data.toArray) val syncProducer = producerPool.getProducer(brokerId) - syncProducer.multiSend(requests) + syncProducer.send(producerRequest) trace("kafka producer sent messages for topics %s to broker %s:%d" .format(messagesPerTopic, syncProducer.config.host, syncProducer.config.port)) } Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (revision 1245648) +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (working copy) @@ -53,7 +53,7 @@ def getErrorCode = errorCode - def serialized(): ByteBuffer = buffer + def getSerialized(): ByteBuffer = buffer def validBytes: Long = shallowValidBytes Index: core/src/main/scala/kafka/message/FileMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/FileMessageSet.scala (revision 1245648) +++ core/src/main/scala/kafka/message/FileMessageSet.scala (working copy) @@ -40,6 +40,8 @@ private val setSize = new AtomicLong() private val setHighWaterMark = new AtomicLong() + def getSerialized(): ByteBuffer = throw new java.lang.UnsupportedOperationException() + if(mutable) { if(limit < Long.MaxValue || offset > 0) throw new IllegalArgumentException("Attempt to open a mutable message set with a view or offset, which is not allowed.") Index: core/src/main/scala/kafka/message/MessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/MessageSet.scala (revision 1245648) +++ core/src/main/scala/kafka/message/MessageSet.scala (working copy) @@ -111,4 +111,9 @@ throw new InvalidMessageException } + /** + * Used to allow children to have serialization on implementation + */ + def getSerialized(): ByteBuffer + } Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1245648) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -194,6 +194,9 @@ def getInt(props: Properties, name: String, default: Int): Int = getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue)) + def getShort(props: Properties, name: String, default: Short): Short = + getShortInRange(props, name, default, (Short.MinValue, Short.MaxValue)) + /** * Read an integer from the properties instance. Throw an exception * if the value is not in the given range (inclusive) @@ -216,6 +219,18 @@ v } + def getShortInRange(props: Properties, name: String, default: Short, range: (Short, Short)): Short = { + val v = + if(props.containsKey(name)) + props.getProperty(name).toShort + else + default + if(v < range._1 || v > range._2) + throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".") + else + v + } + def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = { val value = buffer.getInt if(value < range._1 || value > range._2) @@ -762,4 +777,4 @@ def durationMs: Double = (end.get - start) / (1000.0 * 1000.0) } -} +} \ No newline at end of file Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1245648) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -41,7 +41,6 @@ apiId match { case RequestKeys.Produce => handleProducerRequest(receive) case RequestKeys.Fetch => handleFetchRequest(receive) - case RequestKeys.MultiProduce => handleMultiProducerRequest(receive) case RequestKeys.Offsets => handleOffsetRequest(receive) case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive) case _ => throw new IllegalStateException("No mapping found for handler id " + apiId) @@ -59,31 +58,38 @@ None } - def handleMultiProducerRequest(receive: Receive): Option[Send] = { - val request = MultiProducerRequest.readFrom(receive.buffer) - if(requestLogger.isTraceEnabled) - requestLogger.trace("Multiproducer request " + request.toString) - request.produces.map(handleProducerRequest(_, "MultiProducerRequest")) - None - } - - private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String) = { - val partition = request.getTranslatedPartition(logManager.chooseRandomPartition) - try { - logManager.getOrCreateLog(request.topic, partition).append(request.messages) - trace(request.messages.sizeInBytes + " bytes written to logs.") - } catch { - case e => - error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e) - e match { - case _: IOException => - fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) - System.exit(1) - case _ => + private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String): Option[ProducerResponse] = { + val requestSize = request.data.size + val errors = new Array[Int](requestSize) + val offsets = new Array[Long](requestSize) + + request.data.foreach(d => { + d.partitionData.foreach(p => { + val partition = p.getTranslatedPartition(d.topic, logManager.chooseRandomPartition) + try { + logManager.getOrCreateLog(d.topic, partition).append(p.messages) + trace(p.messages.sizeInBytes + " bytes written to logs.") + p.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum))) } - throw e - } - None + catch { + case e => + //TODO: handle response in ProducerResponse + error("Error processing " + requestHandlerName + " on " + d.topic + ":" + partition, e) + e match { + case _: IOException => + fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) + Runtime.getRuntime.halt(1) + case _ => + } + //throw e + } + }) + //None + }) + if (request.required_acks == 0) + None + else + Some(new ProducerResponse(request.correlation_id, request.version_id, errors, offsets)) } def handleFetchRequest(request: Receive): Option[Send] = { Index: core/src/main/scala/kafka/api/ProducerResponse.scala =================================================================== --- core/src/main/scala/kafka/api/ProducerResponse.scala (revision 0) +++ core/src/main/scala/kafka/api/ProducerResponse.scala (revision 0) @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio._ +import java.nio.channels._ +import kafka.network._ +import kafka.message._ +import kafka.utils._ +import kafka.common.ErrorMapping + +@nonthreadsafe +class ProducerResponse(val correlation_id: Int, val version_id: Short, val errors: Array[Int], val offsets: Array[Long]) extends Send { + + val sizeInBytes = 4 + 2 + 4 + (4 * errors.size) + 4 + (8 * offsets.size) + + private val buffer = ByteBuffer.allocate(sizeInBytes) + buffer.putInt(correlation_id) + buffer.putShort(version_id) + buffer.putInt(errors.size) + errors.foreach(e => buffer.putInt(e)) + buffer.putInt(offsets.size) + offsets.foreach(o => buffer.putLong(o)) + + var complete: Boolean = false + + def writeTo(channel: GatheringByteChannel): Int = { + expectIncomplete() + var written = 0 + written += channel.write(buffer) + if(!buffer.hasRemaining) + complete = true + written + } +} \ No newline at end of file Index: core/src/main/scala/kafka/api/FetchResponse.scala =================================================================== --- core/src/main/scala/kafka/api/FetchResponse.scala (revision 1245648) +++ core/src/main/scala/kafka/api/FetchResponse.scala (working copy) @@ -39,6 +39,15 @@ case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) { val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() + + def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages) + + def getTranslatedPartition(topic: String, randomSelector: String => Int): Int = { + if (partition == ProducerRequest.RandomPartition) + return randomSelector(topic) + else + return partition + } } object TopicData { Index: core/src/main/scala/kafka/api/ProducerRequest.scala =================================================================== --- core/src/main/scala/kafka/api/ProducerRequest.scala (revision 1245648) +++ core/src/main/scala/kafka/api/ProducerRequest.scala (working copy) @@ -24,60 +24,98 @@ object ProducerRequest { val RandomPartition = -1 - + val version_id: Short = 0 + def readFrom(buffer: ByteBuffer): ProducerRequest = { - val topic = Utils.readShortString(buffer, "UTF-8") - val partition = buffer.getInt - val messageSetSize = buffer.getInt - val messageSetBuffer = buffer.slice() - messageSetBuffer.limit(messageSetSize) - buffer.position(buffer.position + messageSetSize) - new ProducerRequest(topic, partition, new ByteBufferMessageSet(messageSetBuffer)) + val version_id: Short = buffer.getShort + val correlation_id: Int = buffer.getInt + val client_id: String = Utils.readShortString(buffer, "UTF-8") + val required_acks: Short = buffer.getShort + val ack_timeout: Int = buffer.getInt + //build the topic structure + val topicCount = buffer.getInt + val data = new Array[TopicData](topicCount) + for(i <- 0 until topicCount) { + val topic = Utils.readShortString(buffer, "UTF-8") + + val partitionCount = buffer.getInt + //build the partition structure within this topic + val partitionData = new Array[PartitionData](partitionCount) + for (j <- 0 until partitionCount) { + val partition = buffer.getInt + val messageSetSize = buffer.getInt + val messageSetBuffer = new Array[Byte](messageSetSize) + buffer.get(messageSetBuffer,0,messageSetSize) + partitionData(j) = new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) + } + data(i) = new TopicData(topic,partitionData) + } + new ProducerRequest(correlation_id,client_id,required_acks,ack_timeout,data) } } -class ProducerRequest(val topic: String, - val partition: Int, - val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) { +case class ProducerRequest(val correlation_id: Int, + val client_id: String, + val required_acks: Short, + val ack_timeout: Int, + val data: Array[TopicData]) extends Request(RequestKeys.Produce) { + val version_id: Short = ProducerRequest.version_id + def writeTo(buffer: ByteBuffer) { - Utils.writeShortString(buffer, topic) - buffer.putInt(partition) - buffer.putInt(messages.serialized.limit) - buffer.put(messages.serialized) - messages.serialized.rewind + buffer.putShort(version_id) + buffer.putInt(correlation_id) + Utils.writeShortString(buffer, client_id, "UTF-8") + buffer.putShort(required_acks) + buffer.putInt(ack_timeout) + //save the topic structure + buffer.putInt(data.size) //the number of topics + data.foreach(d =>{ + Utils.writeShortString(buffer, d.topic, "UTF-8") //write the topic + buffer.putInt(d.partitionData.size) //the number of partitions + d.partitionData.foreach(p => { + buffer.putInt(p.partition) + buffer.putInt(p.messages.getSerialized().limit) + buffer.put(p.messages.getSerialized()) + p.messages.getSerialized().rewind + }) + }) } - - def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + messages.sizeInBytes.asInstanceOf[Int] - def getTranslatedPartition(randomSelector: String => Int): Int = { - if (partition == ProducerRequest.RandomPartition) - return randomSelector(topic) - else - return partition + def sizeInBytes(): Int = { + var size = 0 + //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size + size = 2 + 4 + 2 + client_id.length + 2 + 4 + 4; + data.foreach(d =>{ + size += 2 + d.topic.length + 4 + d.partitionData.foreach(p => { + size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int] + }) + }) + size } override def toString: String = { val builder = new StringBuilder() builder.append("ProducerRequest(") - builder.append(topic + ",") - builder.append(partition + ",") - builder.append(messages.sizeInBytes) + builder.append(version_id + ",") + builder.append(correlation_id + ",") + builder.append(client_id + ",") + builder.append(required_acks + ",") + builder.append(ack_timeout) + data.foreach(d =>{ + builder.append(":[" + d.topic) + d.partitionData.foreach(p => { + builder.append(":[") + builder.append(p.partition + ",") + builder.append(p.messages.sizeInBytes) + builder.append("]") + }) + builder.append("]") + }) builder.append(")") builder.toString } - override def equals(other: Any): Boolean = { - other match { - case that: ProducerRequest => - (that canEqual this) && topic == that.topic && partition == that.partition && - messages.equals(that.messages) - case _ => false - } - } - def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] - - override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode - -} +} \ No newline at end of file Index: core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala =================================================================== --- core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (revision 1245648) +++ core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala (working copy) @@ -18,6 +18,8 @@ import kafka.producer.SyncProducerConfig import kafka.javaapi.message.ByteBufferMessageSet +import kafka.javaapi.ProducerRequest +import kafka.api.{PartitionData, TopicData} class SyncProducer(syncProducer: kafka.producer.SyncProducer) { @@ -25,21 +27,17 @@ val underlying = syncProducer - def send(topic: String, partition: Int, messages: ByteBufferMessageSet) { - import kafka.javaapi.Implicits._ - underlying.send(topic, partition, messages) + def send(producerRequest: kafka.javaapi.ProducerRequest) { + underlying.send(producerRequest.underlying) } - def send(topic: String, messages: ByteBufferMessageSet): Unit = send(topic, - kafka.api.ProducerRequest.RandomPartition, - messages) - - def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) { - import kafka.javaapi.Implicits._ - val produceRequests = new Array[kafka.api.ProducerRequest](produces.length) - for(i <- 0 until produces.length) - produceRequests(i) = new kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, produces(i).messages) - underlying.multiSend(produceRequests) + def send(topic: String, messages: ByteBufferMessageSet): Unit = { + var data = new Array[TopicData](1) + var partition_data = new Array[PartitionData](1) + partition_data(0) = new PartitionData(-1,messages.underlying) + data(0) = new TopicData(topic,partition_data) + val producerRequest = new kafka.api.ProducerRequest(-1, "", 0, 0, data) + underlying.send(producerRequest) } def close() { Index: core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (revision 1245648) +++ core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (working copy) @@ -39,7 +39,7 @@ def validBytes: Long = underlying.validBytes - def serialized():ByteBuffer = underlying.serialized + def serialized():ByteBuffer = underlying.getSerialized() def getInitialOffset = initialOffset Index: core/src/main/scala/kafka/javaapi/FetchResponse.scala =================================================================== --- core/src/main/scala/kafka/javaapi/FetchResponse.scala (revision 1245648) +++ core/src/main/scala/kafka/javaapi/FetchResponse.scala (working copy) @@ -22,7 +22,7 @@ class FetchResponse( val versionId: Short, val correlationId: Int, - val data: Array[TopicData] ) { + private val data: Array[TopicData] ) { private val underlying = new kafka.api.FetchResponse(versionId, correlationId, data) Index: core/src/main/scala/kafka/javaapi/ProducerRequest.scala =================================================================== --- core/src/main/scala/kafka/javaapi/ProducerRequest.scala (revision 1245648) +++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala (working copy) @@ -17,36 +17,29 @@ package kafka.javaapi import kafka.network.Request -import kafka.api.RequestKeys +import kafka.api.{RequestKeys, TopicData} import java.nio.ByteBuffer -class ProducerRequest(val topic: String, - val partition: Int, - val messages: kafka.javaapi.message.ByteBufferMessageSet) extends Request(RequestKeys.Produce) { +class ProducerRequest(val correlation_id: Int, + val client_id: String, + val required_acks: Short, + val ack_timeout: Int, + val data: Array[TopicData]) extends Request(RequestKeys.Produce) { + import Implicits._ - private val underlying = new kafka.api.ProducerRequest(topic, partition, messages) + val underlying = new kafka.api.ProducerRequest(correlation_id, client_id, required_acks, ack_timeout, data) def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } def sizeInBytes(): Int = underlying.sizeInBytes - def getTranslatedPartition(randomSelector: String => Int): Int = - underlying.getTranslatedPartition(randomSelector) - override def toString: String = underlying.toString - override def equals(other: Any): Boolean = { - other match { - case that: ProducerRequest => - (that canEqual this) && topic == that.topic && partition == that.partition && - messages.equals(that.messages) - case _ => false - } - } + override def equals(other: Any): Boolean = underlying.equals(other) def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest] - override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode + override def hashCode: Int = underlying.hashCode -} +} \ No newline at end of file