Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (revision 1413738) +++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (working copy) @@ -85,7 +85,7 @@ val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ack = SyncProducerConfig.DefaultRequiredAcks - val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, MessageSet]()) + val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) val producer = new SyncProducer(new SyncProducerConfig(props)) val response = producer.send(emptyRequest) Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1413738) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -210,12 +210,8 @@ warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic)) messagesPerTopic.keys.toSeq } else if(messagesPerTopic.size > 0) { - val topicPartitionDataPairs = messagesPerTopic.toSeq.map { - case (topicAndPartition, messages) => - (topicAndPartition, messages) - } val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, - config.requestTimeoutMs, Map(topicPartitionDataPairs:_*)) + config.requestTimeoutMs, messagesPerTopic) try { val syncProducer = producerPool.getProducer(brokerId) val response = syncProducer.send(producerRequest) Index: core/src/main/scala/kafka/api/ProducerRequest.scala =================================================================== --- core/src/main/scala/kafka/api/ProducerRequest.scala (revision 1413738) +++ core/src/main/scala/kafka/api/ProducerRequest.scala (working copy) @@ -57,7 +57,7 @@ clientId: String, requiredAcks: Short, ackTimeoutMs: Int, - data: Map[TopicAndPartition, MessageSet]) + data: Map[TopicAndPartition, ByteBufferMessageSet]) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { /** @@ -69,7 +69,7 @@ clientId: String, requiredAcks: Short, ackTimeoutMs: Int, - data: Map[TopicAndPartition, MessageSet]) = + data: Map[TopicAndPartition, ByteBufferMessageSet]) = this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data) def writeTo(buffer: ByteBuffer) { @@ -88,7 +88,7 @@ topicAndPartitionData.foreach(partitionAndData => { val partition = partitionAndData._1.partition val partitionMessageData = partitionAndData._2 - val bytes = partitionMessageData.asInstanceOf[ByteBufferMessageSet].buffer + val bytes = partitionMessageData.buffer buffer.putInt(partition) buffer.putInt(bytes.limit) buffer.put(bytes)