diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 28de573..2b0394a 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -38,7 +38,11 @@ object ConsoleProducer {
                            .describedAs("broker-list")
                            .ofType(classOf[String])
     val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
-    val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
+    val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed")
+                                    .withRequiredArg
+                                    .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2")
+                                    .ofType(classOf[java.lang.Integer])
+                                    .defaultsTo(0)
     val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
                              .withRequiredArg
                              .describedAs("size")
@@ -119,7 +123,7 @@ object ConsoleProducer {
     val topic = options.valueOf(topicOpt)
     val brokerList = options.valueOf(brokerListOpt)
     val sync = options.has(syncOpt)
-    val compress = options.has(compressOpt)
+    val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue)
     val batchSize = options.valueOf(batchSizeOpt)
     val sendTimeout = options.valueOf(sendTimeoutOpt)
     val queueSize = options.valueOf(queueSizeOpt)
@@ -135,8 +139,7 @@ object ConsoleProducer {
 
     val props = new Properties()
     props.put("metadata.broker.list", brokerList)
-    val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
-    props.put("compression.codec", codec.toString)
+    props.put("compression.codec", compressionCodec.codec.toString)
     props.put("producer.type", if(sync) "sync" else "async")
     if(options.has(batchSizeOpt))
       props.put("batch.num.messages", batchSize.toString)
