Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (revision 1159148) +++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (working copy) @@ -243,6 +243,48 @@ } + @Test + def testCollateAndSerializeEvents() { + val basicProducer = EasyMock.createMock(classOf[SyncProducer]) + basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1, + getMessageSetOfSize(List(message2), 5)), + new ProducerRequest(topic1, 0, + getMessageSetOfSize(List(message1), 5)), + new ProducerRequest(topic1, 1, + getMessageSetOfSize(List(message1), 5)), + new ProducerRequest(topic2, 0, + getMessageSetOfSize(List(message2), 5))))) + + EasyMock.expectLastCall + basicProducer.close + EasyMock.expectLastCall + EasyMock.replay(basicProducer) + + val props = new Properties() + props.put("host", "localhost") + props.put("port", "9092") + props.put("queue.size", "50") + props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("batch.size", "20") + + val config = new AsyncProducerConfig(props) + + val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer) + + producer.start + val serializer = new StringSerializer + for(i <- 0 until 5) { + producer.send(topic2, messageContent2, 0) + producer.send(topic2, messageContent2, 1) + producer.send(topic1, messageContent1, 0) + producer.send(topic1, messageContent1, 1) + } + + producer.close + EasyMock.verify(basicProducer) + + } + private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = { var messageList = new Array[Message](counts) for(message <- messages) {