From c10fe4d24d930cea2e05a518d6cdb3acc8ea3faf Mon Sep 17 00:00:00 2001
From: Rajasekar Elango <relango@saleforce.com>
Date: Thu, 29 Aug 2013 09:02:53 -0700
Subject: [PATCH] Add message-send-max-retries and retry-backoff-ms options to
 console producer

---
 .../scala/kafka/producer/ConsoleProducer.scala     |   13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 5539bce..5329f3b 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -44,6 +44,14 @@ object ConsoleProducer {
                              .describedAs("size")
                              .ofType(classOf[java.lang.Integer])
                              .defaultsTo(200)
+    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "The leader may be unavailable transiently, which can fail the sending of a message. This property specifies the number of retries when such failures occur.")
+                             .withRequiredArg
+                             .ofType(classOf[java.lang.Integer])
+                             .defaultsTo(3)
+    val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.")
+                             .withRequiredArg
+                             .ofType(classOf[java.lang.Long])
+                             .defaultsTo(100)
     val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + 
                                                    " a message will queue awaiting suffient batch size. The value is given in ms.")
                                .withRequiredArg
@@ -97,7 +105,7 @@ object ConsoleProducer {
                             .withRequiredArg
                             .describedAs("prop")
                             .ofType(classOf[String])
-
+                            
 
     val options = parser.parse(args : _*)
     for(arg <- List(topicOpt, brokerListOpt)) {
@@ -132,6 +140,9 @@ object ConsoleProducer {
     props.put("producer.type", if(sync) "sync" else "async")
     if(options.has(batchSizeOpt))
       props.put("batch.num.messages", batchSize.toString)
+    
+    props.put("message.send.max.retries", options.valueOf(messageSendMaxRetriesOpt).toString)
+    props.put("retry.backoff.ms", options.valueOf(retryBackoffMsOpt).toString)
     props.put("queue.buffering.max.ms", sendTimeout.toString)
     props.put("queue.buffering.max.messages", queueSize.toString)
     props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)
-- 
1.7.9.5

