Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala =================================================================== --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1156940) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -23,9 +23,11 @@ import kafka.api.ProducerRequest import kafka.serializer.Encoder import java.util.Properties -import kafka.message.{NoCompressionCodec, ByteBufferMessageSet} import kafka.producer.{ProducerConfig, SyncProducerConfigShared, SyncProducerConfig, SyncProducer} +import kafka.utils.Utils +import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet} + private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, val cbkHandler: CallbackHandler[T]) extends EventHandler[T] { @@ -37,6 +39,7 @@ var processedEvents = events if(cbkHandler != null) processedEvents = cbkHandler.beforeSendingData(events) + send(serialize(collate(processedEvents), serializer), syncProducer) } @@ -60,34 +63,36 @@ * If the list of compressed topics is empty, then enable the specified compression codec for all topics * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ - val messages = eventsPerTopicMap.map(e => { - config.compressionCodec match { - case NoCompressionCodec => - if(logger.isDebugEnabled) - logger.debug("Sending %d messages with no compression".format(e._2.size)) - new ByteBufferMessageSet(NoCompressionCodec, e._2: _*) - case _ => - config.compressedTopics.size match { - case 0 => - if(logger.isDebugEnabled) - logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec)) - new ByteBufferMessageSet(config.compressionCodec, e._2: _*) - case _ => - if(config.compressedTopics.contains(e._1._1)) { + + val messagesPerTopicPartition = eventsPerTopicMap.map { topicAndEvents => + ((topicAndEvents._1._1, topicAndEvents._1._2), + config.compressionCodec match { + case NoCompressionCodec => + if(logger.isDebugEnabled) + logger.debug("Sending %d messages with no compression".format(topicAndEvents._2.size)) + new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*) + case _ => + config.compressedTopics.size match { + case 0 => if(logger.isDebugEnabled) - logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec)) - new ByteBufferMessageSet(config.compressionCodec, e._2: _*) - } - else { - if(logger.isDebugEnabled) - logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s" - .format(e._2.size, e._1._1, config.compressedTopics.toString)) - new ByteBufferMessageSet(NoCompressionCodec, e._2: _*) - } - } - } - }) - topicsAndPartitions.zip(messages) + logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size, config.compressionCodec.codec)) + new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*) + case _ => + if(config.compressedTopics.contains(topicAndEvents._1._1)) { + if(logger.isDebugEnabled) + logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size, config.compressionCodec.codec)) + new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*) + } + else { + if(logger.isDebugEnabled) + logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s" + .format(topicAndEvents._2.size, topicAndEvents._1._1, config.compressedTopics.toString)) + new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*) + } + } + }) + } + messagesPerTopicPartition } private def collate(events: Seq[QueueItem[T]]): Map[(String,Int), Seq[T]] = { @@ -100,8 +105,8 @@ val topicEvents = remainingEvents partition (e => e.getTopic.equals(topic)) remainingEvents = topicEvents._2 distinctPartitions.foreach { p => - val topicPartitionEvents = topicEvents._1 partition (e => (e.getPartition == p)) - collatedEvents += ( (topic, p) -> topicPartitionEvents._1.map(q => q.getData).toSeq) + val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition == p)))._1 + collatedEvents += ( (topic, p) -> topicPartitionEvents.map(q => q.getData)) } } collatedEvents Index: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (revision 1156940) +++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (working copy) @@ -136,13 +136,13 @@ } catch { case e => if (skipMessageOnError) - logger.error("error processing message, skipping and resume consumption: " + e) + logger.error("error processing message, skipping and resume consumption: ", e) else throw e } } } catch { - case e => logger.error("error processing message, stop consuming: " + e) + case e => logger.error("error processing message, stop consuming: ", e) } System.out.flush()