Index: system_test/producer_perf/bin/run-compression-test.sh =================================================================== --- system_test/producer_perf/bin/run-compression-test.sh (revision 1160894) +++ system_test/producer_perf/bin/run-compression-test.sh (working copy) @@ -14,7 +14,7 @@ sleep 4 echo "start producing $num_messages messages ..." -$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10 --compression-codec 1 +$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --compression-codec 1 echo "wait for data to be persisted" cur_offset="-1" Index: system_test/producer_perf/bin/run-test.sh =================================================================== --- system_test/producer_perf/bin/run-test.sh (revision 1160894) +++ system_test/producer_perf/bin/run-test.sh (working copy) @@ -14,7 +14,7 @@ sleep 4 echo "start producing $num_messages messages ..." -$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --delay-btw-batch-ms 10 +$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topic test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async echo "wait for data to be persisted" cur_offset="-1" Index: system_test/embedded_consumer/bin/run-test.sh =================================================================== --- system_test/embedded_consumer/bin/run-test.sh (revision 1160894) +++ system_test/embedded_consumer/bin/run-test.sh (working copy) @@ -111,7 +111,7 @@ start_producer() { topic=$1 info "start producing messages for topic $topic ..." - $base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async --delay-btw-batch-ms 10 2>&1 > $base_dir/producer_performance.log & + $base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo zk.connect=localhost:2181 --topic $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log & pid_producer=$! } Index: core/src/main/scala/kafka/tools/ProducerPerformance.scala =================================================================== --- core/src/main/scala/kafka/tools/ProducerPerformance.scala (revision 1160894) +++ core/src/main/scala/kafka/tools/ProducerPerformance.scala (working copy) @@ -83,11 +83,6 @@ .defaultsTo(100) val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.") - val delayMSBtwBatchOpt = parser.accepts("delay-btw-batch-ms", "Delay in ms between 2 batch sends.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Long]) - .defaultsTo(0) val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.") .withRequiredArg .describedAs("size") @@ -122,7 +117,6 @@ val messageSize = options.valueOf(messageSizeOpt).intValue val isFixSize = !options.has(varyMessageSizeOpt) val isAsync = options.has(asyncOpt) - val delayedMSBtwSend = options.valueOf(delayMSBtwBatchOpt).longValue var batchSize = options.valueOf(batchSizeOpt).intValue val numThreads = options.valueOf(numThreadsOpt).intValue val topic = options.valueOf(topicOpt) @@ -156,7 +150,7 @@ props.put("batch.size", config.batchSize.toString) props.put("reconnect.interval", Integer.MAX_VALUE.toString) props.put("buffer.size", (64*1024).toString) - + props.put("queue.enqueueTimeout.ms", "-1") logger.info("Producer properties = " + props.toString) val producerConfig = new ProducerConfig(props) @@ -183,8 +177,6 @@ bytesSent += config.messageSize try { producer.send(new ProducerData[String,String](config.topic, message)) - if (config.delayedMSBtwSend > 0 && (nSends + 1) % config.batchSize == 0) - Thread.sleep(config.delayedMSBtwSend) nSends += 1 }catch { case e: Exception => e.printStackTrace @@ -253,8 +245,6 @@ bytesSent += config.batchSize*config.messageSize try { producer.send(new ProducerData[String,String](config.topic, messageSet)) - if (config.delayedMSBtwSend > 0 && (nSends + 1) % config.batchSize == 0) - Thread.sleep(config.delayedMSBtwSend) nSends += 1 }catch { case e: Exception => e.printStackTrace