diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 8c32115..558b494 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -50,6 +50,27 @@ object ConsoleProducer {
                                .describedAs("timeout_ms")
                                .ofType(classOf[java.lang.Long])
                                .defaultsTo(1000)
+    val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " + 
+                                                   " messages will queue awaiting suffient batch size.")
+                               .withRequiredArg
+                               .describedAs("queue_size")
+                               .ofType(classOf[java.lang.Long])
+                               .defaultsTo(10000)
+    val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
+                               .withRequiredArg
+                               .describedAs("queue enqueuetimeout ms")
+                               .ofType(classOf[java.lang.Long])
+                               .defaultsTo(0)
+    val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
+                               .withRequiredArg
+                               .describedAs("request required acks")
+                               .ofType(classOf[java.lang.Integer])
+                               .defaultsTo(0)
+    val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero")
+                               .withRequiredArg
+                               .describedAs("request timeout ms")
+                               .ofType(classOf[java.lang.Integer])
+                               .defaultsTo(1500)
     val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
                                  .withRequiredArg
                                  .describedAs("encoder_class")
@@ -88,6 +109,10 @@ object ConsoleProducer {
     val compress = options.has(compressOpt)
     val batchSize = options.valueOf(batchSizeOpt)
     val sendTimeout = options.valueOf(sendTimeoutOpt)
+    val queueSize = options.valueOf(queueSizeOpt)
+    val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
+    val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
+    val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
     val keyEncoderClass = options.valueOf(keyEncoderOpt)
     val valueEncoderClass = options.valueOf(valueEncoderOpt)
     val readerClass = options.valueOf(messageReaderOpt)
@@ -102,6 +127,10 @@ object ConsoleProducer {
     if(options.has(batchSizeOpt))
       props.put("batch.size", batchSize.toString)
     props.put("queue.time", sendTimeout.toString)
+    props.put("queue.size", queueSize.toString)
+    props.put("queue.enqueueTimeout.ms", queueEnqueueTimeoutMs.toString)
+    props.put("producer.request.required.acks", requestRequiredAcks.toString)
+    props.put("producer.request.timeout.ms", requestTimeoutMs.toString)
     props.put("key.serializer.class", keyEncoderClass)
     props.put("serializer.class", valueEncoderClass)
 
