From 78d3fbb439258d656937a0f9ed2b58283ccf2cf4 Mon Sep 17 00:00:00 2001
From: Guozhang Wang <guwang@linkedin.com>
Date: Mon, 5 Aug 2013 14:08:32 -0700
Subject: [PATCH 1/2] K998, phase 1

---
 .../src/main/scala/kafka/common/ErrorMapping.scala |    2 +
 .../kafka/producer/async/DefaultEventHandler.scala |   42 ++++++++++++--------
 2 files changed, 27 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 153bc0b..954a31f 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -70,4 +70,6 @@ object ErrorMapping {
       throw codeToException(code).newInstance()
 
   def exceptionFor(code: Short) : Throwable = codeToException(code).newInstance()
+
+  def fatalException(code: Short) : Boolean = code == MessageSizeTooLargeCode
 }
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 48ddb6a..6ebb1ac 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -60,9 +60,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       }
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.messageSendMaxRetries + 1
+      var stillNeedRetry = true
       val correlationIdStart = correlationId.get()
       debug("Handling %d events".format(events.size))
-      while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
+      while (remainingRetries > 0 && outstandingProduceRequests.size > 0 && stillNeedRetry) {
         topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
         if (topicMetadataRefreshInterval >= 0 &&
             SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
@@ -70,8 +71,10 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           topicMetadataToRefresh.clear
           lastTopicMetadataRefreshTime = SystemTime.milliseconds
         }
-        outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
-        if (outstandingProduceRequests.size > 0) {
+        val failedProduceRequestsAndCanRetry = dispatchSerializedData(outstandingProduceRequests)
+        outstandingProduceRequests = failedProduceRequestsAndCanRetry._1
+        stillNeedRetry = failedProduceRequestsAndCanRetry._2
+        if (outstandingProduceRequests.size > 0 && stillNeedRetry) {
           info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
           // back off and update the topic metadata cache before attempting another send operation
           Thread.sleep(config.retryBackoffMs)
@@ -92,11 +95,12 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     }
   }
 
-  private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
+  private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): (Seq[KeyedMessage[K, Message]], Boolean) = {
     val partitionedDataOpt = partitionAndCollate(messages)
     partitionedDataOpt match {
       case Some(partitionedData) =>
         val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
+        var needRetry = true
         try {
           for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
             if (logger.isTraceEnabled)
@@ -104,10 +108,13 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
                 trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
             val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
 
-            val failedTopicPartitions = send(brokerid, messageSetPerBroker)
-            failedTopicPartitions.foreach(topicPartition => {
-              messagesPerBrokerMap.get(topicPartition) match {
-                case Some(data) => failedProduceRequests.appendAll(data)
+            val failedTopicPartitionsAndErrorCode = send(brokerid, messageSetPerBroker)
+            failedTopicPartitionsAndErrorCode.foreach(topicPartitionAndError => {
+              messagesPerBrokerMap.get(topicPartitionAndError._1) match {
+                case Some(data) => {
+                  failedProduceRequests.appendAll(data)
+                  needRetry = needRetry && !fatalException(topicPartitionAndError._2)
+                }
                 case None => // nothing
               }
             })
@@ -115,9 +122,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
         } catch {
           case t: Throwable => error("Failed to send messages", t)
         }
-        failedProduceRequests
+        (failedProduceRequests, needRetry)
       case None => // all produce requests failed
-        messages
+        (messages, true)
     }
   }
 
@@ -229,7 +236,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
    * @param messagesPerTopic the messages as a map from (topic, partition) -> messages
    * @return the set (topic, partitions) messages which incurred an error sending or processing
    */
-  private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
+  private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) : Seq[(TopicAndPartition, Short)] = {
     if(brokerId < 0) {
       warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
       messagesPerTopic.keys.toSeq
@@ -237,7 +244,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       val currentCorrelationId = correlationId.getAndIncrement
       val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
         config.requestTimeoutMs, messagesPerTopic)
-      var failedTopicPartitions = Seq.empty[TopicAndPartition]
+      var failedTopicPartitionsAndErrorCode = Seq.empty[(TopicAndPartition, Short)]
       try {
         val syncProducer = producerPool.getProducer(brokerId)
         debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d"
@@ -254,8 +261,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
               trace("Successfully sent message: %s".format(if(message.message.isNull) null else Utils.readString(message.message.payload)))))
           }
           val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
-          failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
-          if(failedTopicPartitions.size > 0) {
+          failedTopicPartitionsAndErrorCode = failedPartitionsAndStatus.map(partitionStatus => (partitionStatus._1, partitionStatus._2.error))
+          if(failedTopicPartitionsAndErrorCode.size > 0) {
             val errorString = failedPartitionsAndStatus
               .sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
                                     (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
@@ -265,15 +272,16 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
               }.mkString(",")
             warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
           }
-          failedTopicPartitions
+          failedTopicPartitionsAndErrorCode
         } else {
-          Seq.empty[TopicAndPartition]
+          Seq.empty[(TopicAndPartition, Short)]
         }
       } catch {
         case t: Throwable =>
           warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
             .format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(",")), t)
-          messagesPerTopic.keys.toSeq
+          messagesPerTopic.keys.map(topicAndPartition => (topicAndPartition, ErrorMapping.UnknownCode)).toSeq
+
       }
     } else {
       List.empty
-- 
1.7.1


From fd2eb984e727b54fd569684c51186f458fea039b Mon Sep 17 00:00:00 2001
From: Guozhang Wang <guwang@linkedin.com>
Date: Mon, 5 Aug 2013 14:29:40 -0700
Subject: [PATCH 2/2] phase 1 compile

---
 .../kafka/producer/async/DefaultEventHandler.scala |    4 ++--
 1 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 6ebb1ac..320b6cc 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -113,7 +113,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
               messagesPerBrokerMap.get(topicPartitionAndError._1) match {
                 case Some(data) => {
                   failedProduceRequests.appendAll(data)
-                  needRetry = needRetry && !fatalException(topicPartitionAndError._2)
+                  needRetry = needRetry && !ErrorMapping.fatalException(topicPartitionAndError._2)
                 }
                 case None => // nothing
               }
@@ -239,7 +239,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
   private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) : Seq[(TopicAndPartition, Short)] = {
     if(brokerId < 0) {
       warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
-      messagesPerTopic.keys.toSeq
+      messagesPerTopic.keys.map(topicAndPartition => (topicAndPartition, ErrorMapping.LeaderNotAvailableCode)).toSeq
     } else if(messagesPerTopic.size > 0) {
       val currentCorrelationId = correlationId.getAndIncrement
       val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
-- 
1.7.1

