diff --git core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index d8ac915..0f7f941 100644 --- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -95,27 +95,28 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val partitionedDataOpt = partitionAndCollate(messages) partitionedDataOpt match { case Some(partitionedData) => - val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]] - try { - for ((brokerid, messagesPerBrokerMap) <- partitionedData) { - if (logger.isTraceEnabled) - messagesPerBrokerMap.foreach(partitionAndEvent => - trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) - val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap) - - val failedTopicPartitions = send(brokerid, messageSetPerBroker) - failedTopicPartitions.foreach(topicPartition => { - messagesPerBrokerMap.get(topicPartition) match { - case Some(data) => failedProduceRequests.appendAll(data) - case None => // nothing - } - }) + val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]] + for ((brokerid, messagesPerBrokerMap) <- partitionedData) { + if (logger.isTraceEnabled) { + messagesPerBrokerMap.foreach(partitionAndEvent => + trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2))) + } + val messageSetPerBrokerOpt = groupMessagesToSet(messagesPerBrokerMap) + messageSetPerBrokerOpt match { + case Some(messageSetPerBroker) => + val failedTopicPartitions = send(brokerid, messageSetPerBroker) + failedTopicPartitions.foreach(topicPartition => { + messagesPerBrokerMap.get(topicPartition) match { + case Some(data) => failedProduceRequests.appendAll(data) + case None => // nothing + } + }) + case None => // failed to group messages + messagesPerBrokerMap.values.foreach(m => failedProduceRequests.appendAll(m)) } - } catch { - case t: Throwable => error("Failed to send messages", t) } failedProduceRequests - case None => // all produce requests failed + case None => // failed to collate messages messages } } @@ -290,43 +291,46 @@ class DefaultEventHandler[K,V](config: ProducerConfig, } } - private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = { + private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]) = { /** enforce the compressed.topics config here. - * If the compression codec is anything other than NoCompressionCodec, - * Enable compression only for specified topics if any - * If the list of compressed topics is empty, then enable the specified compression codec for all topics - * If the compression codec is NoCompressionCodec, compression is disabled for all topics + * If the compression codec is anything other than NoCompressionCodec, + * Enable compression only for specified topics if any + * If the list of compressed topics is empty, then enable the specified compression codec for all topics + * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ - - val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) => - val rawMessages = messages.map(_.message) - ( topicAndPartition, - config.compressionCodec match { - case NoCompressionCodec => - debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition)) - new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) - case _ => - config.compressedTopics.size match { - case 0 => - debug("Sending %d messages with compression codec %d to %s" - .format(messages.size, config.compressionCodec.codec, topicAndPartition)) - new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*) - case _ => - if(config.compressedTopics.contains(topicAndPartition.topic)) { + try { + val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) => + val rawMessages = messages.map(_.message) + (topicAndPartition, + config.compressionCodec match { + case NoCompressionCodec => + debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition)) + new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) + case _ => + config.compressedTopics.size match { + case 0 => debug("Sending %d messages with compression codec %d to %s" .format(messages.size, config.compressionCodec.codec, topicAndPartition)) new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*) - } - else { - debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s" - .format(messages.size, topicAndPartition, config.compressedTopics.toString)) - new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) - } - } - } - ) + case _ => + if (config.compressedTopics.contains(topicAndPartition.topic)) { + debug("Sending %d messages with compression codec %d to %s" + .format(messages.size, config.compressionCodec.codec, topicAndPartition)) + new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*) + } + else { + debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s" + .format(messages.size, topicAndPartition, config.compressedTopics.toString)) + new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*) + } + } + } + ) + } + Some(messagesPerTopicPartition) + } catch { + case t: Throwable => error("Failed to group messages", t); None } - messagesPerTopicPartition } def close() {