From c10fe4d24d930cea2e05a518d6cdb3acc8ea3faf Mon Sep 17 00:00:00 2001
From: Rajasekar Elango <relango@saleforce.com>
Date: Thu, 29 Aug 2013 09:02:53 -0700
Subject: [PATCH 1/2] Add message-send-max-retries and retry-backoff-ms
 options to console producer

---
 .../scala/kafka/producer/ConsoleProducer.scala     |   13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 5539bce..5329f3b 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -44,6 +44,14 @@ object ConsoleProducer {
                              .describedAs("size")
                              .ofType(classOf[java.lang.Integer])
                              .defaultsTo(200)
+    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "The leader may be unavailable transiently, which can fail the sending of a message. This property specifies the number of retries when such failures occur.")
+                             .withRequiredArg
+                             .ofType(classOf[java.lang.Integer])
+                             .defaultsTo(3)
+    val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.")
+                             .withRequiredArg
+                             .ofType(classOf[java.lang.Long])
+                             .defaultsTo(100)
     val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + 
                                                    " a message will queue awaiting suffient batch size. The value is given in ms.")
                                .withRequiredArg
@@ -97,7 +105,7 @@ object ConsoleProducer {
                             .withRequiredArg
                             .describedAs("prop")
                             .ofType(classOf[String])
-
+                            
 
     val options = parser.parse(args : _*)
     for(arg <- List(topicOpt, brokerListOpt)) {
@@ -132,6 +140,9 @@ object ConsoleProducer {
     props.put("producer.type", if(sync) "sync" else "async")
     if(options.has(batchSizeOpt))
       props.put("batch.num.messages", batchSize.toString)
+    
+    props.put("message.send.max.retries", options.valueOf(messageSendMaxRetriesOpt).toString)
+    props.put("retry.backoff.ms", options.valueOf(retryBackoffMsOpt).toString)
     props.put("queue.buffering.max.ms", sendTimeout.toString)
     props.put("queue.buffering.max.messages", queueSize.toString)
     props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)
-- 
1.7.9.5


From df8e077b951d950331ff33d7ddf6876ca3a0b033 Mon Sep 17 00:00:00 2001
From: Rajasekar Elango <relango@saleforce.com>
Date: Fri, 30 Aug 2013 13:53:08 -0700
Subject: [PATCH 2/2] Update description for message.send.max.retries option

---
 .../scala/kafka/producer/ConsoleProducer.scala     |    2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 5329f3b..00cb2e8 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -44,7 +44,7 @@ object ConsoleProducer {
                              .describedAs("size")
                              .ofType(classOf[java.lang.Integer])
                              .defaultsTo(200)
-    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "The leader may be unavailable transiently, which can fail the sending of a message. This property specifies the number of retries when such failures occur.")
+    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.")
                              .withRequiredArg
                              .ofType(classOf[java.lang.Integer])
                              .defaultsTo(3)
-- 
1.7.9.5

