Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/ConsoleProducer.scala (revision 1291490) +++ core/src/main/scala/kafka/producer/ConsoleProducer.scala (working copy) @@ -36,7 +36,7 @@ .withRequiredArg .describedAs("connection_string") .ofType(classOf[String]) - val asyncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") + val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed") val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.") .withRequiredArg @@ -78,7 +78,7 @@ val topic = options.valueOf(topicOpt) val zkConnect = options.valueOf(zkConnectOpt) - val async = options.has(asyncOpt) + val sync = options.has(syncOpt) val compress = options.has(compressOpt) val batchSize = options.valueOf(batchSizeOpt) val sendTimeout = options.valueOf(sendTimeoutOpt) @@ -89,10 +89,10 @@ val props = new Properties() props.put("zk.connect", zkConnect) props.put("compression.codec", DefaultCompressionCodec.codec.toString) - props.put("producer.type", if(async) "async" else "sync") + props.put("producer.type", if(sync) "sync" else "async") if(options.has(batchSizeOpt)) - props.put("batch.size", batchSize) - props.put("queue.enqueueTimeout.ms", sendTimeout.toString) + props.put("batch.size", batchSize.toString) + props.put("queue.time", sendTimeout.toString) props.put("serializer.class", encoderClass) val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]