diff --git a/bin/kafka-console-consumer.sh b/bin/kafka-console-consumer.sh index e410dde..07c90a9 100755 --- a/bin/kafka-console-consumer.sh +++ b/bin/kafka-console-consumer.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.consumer.ConsoleConsumer $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer $@ diff --git a/bin/kafka-console-producer.sh b/bin/kafka-console-producer.sh index cd8ce62..ccca66d 100755 --- a/bin/kafka-console-producer.sh +++ b/bin/kafka-console-producer.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,4 +17,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.producer.ConsoleProducer $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer $@ diff --git a/bin/kafka-consumer-perf-test.sh b/bin/kafka-consumer-perf-test.sh index 4ed3ed9..ebc513a 100755 --- a/bin/kafka-consumer-perf-test.sh +++ b/bin/kafka-consumer-perf-test.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,4 +17,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.perf.ConsumerPerformance $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance $@ diff --git a/bin/kafka-producer-perf-test.sh b/bin/kafka-producer-perf-test.sh index b4efc29..84ac949 100755 --- a/bin/kafka-producer-perf-test.sh +++ b/bin/kafka-producer-perf-test.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,4 +17,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.perf.ProducerPerformance $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ProducerPerformance $@ diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index d2fc8c0..5d5021d 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -41,11 +41,6 @@ do CLASSPATH=$CLASSPATH:$file done -for file in $base_dir/perf/build/libs//kafka-perf_${SCALA_VERSION}*.jar; -do - CLASSPATH=$CLASSPATH:$file -done - for file in $base_dir/examples/build/libs//kafka-examples*.jar; do CLASSPATH=$CLASSPATH:$file @@ -155,6 +150,3 @@ if [ "x$DAEMON_MODE" = "xtrue" ]; then else exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" fi - - - diff --git a/bin/kafka-simple-consumer-perf-test.sh b/bin/kafka-simple-consumer-perf-test.sh index 2d3e3d3..b1a5cfc 100755 --- a/bin/kafka-simple-consumer-perf-test.sh +++ b/bin/kafka-simple-consumer-perf-test.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.perf.SimpleConsumerPerformance $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerPerformance $@ diff --git a/bin/windows/kafka-run-class.bat b/bin/windows/kafka-run-class.bat index f4d2904..2f781c6 100644 --- a/bin/windows/kafka-run-class.bat +++ b/bin/windows/kafka-run-class.bat @@ -36,11 +36,6 @@ for %%i in (%BASE_DIR%\core\build\dependant-libs-%SCALA_VERSION%\*.jar) do ( call :concat %%i ) -rem Classpath addition for kafka-perf dependencies -for %%i in (%BASE_DIR%\perf\build\dependant-libs-%SCALA_VERSION%\*.jar) do ( - call :concat %%i -) - rem Classpath addition for kafka-clients for %%i in (%BASE_DIR%\clients\build\libs\kafka-clients-*.jar) do ( call :concat %%i @@ -126,4 +121,4 @@ IF ["%CLASSPATH%"] EQU [""] ( set CLASSPATH="%1" ) ELSE ( set CLASSPATH=%CLASSPATH%;"%1" -) \ No newline at end of file +) diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala deleted file mode 100644 index 1a16c69..0000000 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ /dev/null @@ -1,284 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.consumer - -import scala.collection.JavaConversions._ -import org.I0Itec.zkclient._ -import joptsimple._ -import java.util.Properties -import java.util.Random -import java.io.PrintStream -import kafka.message._ -import kafka.serializer._ -import kafka.utils._ -import kafka.metrics.KafkaMetricsReporter - - -/** - * Consumer that dumps messages out to standard out. - * - */ -object ConsoleConsumer extends Logging { - - def main(args: Array[String]) { - val parser = new OptionParser - val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") - .withRequiredArg - .describedAs("whitelist") - .ofType(classOf[String]) - val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.") - .withRequiredArg - .describedAs("blacklist") - .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - - val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") - .withRequiredArg - .describedAs("config file") - .ofType(classOf[String]) - val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") - .withRequiredArg - .describedAs("class") - .ofType(classOf[String]) - .defaultsTo(classOf[DefaultMessageFormatter].getName) - val messageFormatterArgOpt = parser.accepts("property") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[String]) - val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); - val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + - "start with the earliest message present in the log rather than the latest message.") - val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") - .withRequiredArg - .describedAs("num_messages") - .ofType(classOf[java.lang.Integer]) - val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + - "skip it instead of halt.") - val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") - val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") - .withRequiredArg - .describedAs("metrics dictory") - .ofType(classOf[java.lang.String]) - - var groupIdPassed = true - val options: OptionSet = tryParse(parser, args) - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) - val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) - if (topicOrFilterOpt.size != 1) { - error("Exactly one of whitelist/blacklist/topic is required.") - parser.printHelpOn(System.err) - System.exit(1) - } - val topicArg = options.valueOf(topicOrFilterOpt.head) - val filterSpec = if (options.has(blacklistOpt)) - new Blacklist(topicArg) - else - new Whitelist(topicArg) - - val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) - if (csvMetricsReporterEnabled) { - val csvReporterProps = new Properties() - csvReporterProps.put("kafka.metrics.polling.interval.secs", "5") - csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") - if (options.has(metricsDirectoryOpt)) - csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) - else - csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics") - csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true") - val verifiableProps = new VerifiableProperties(csvReporterProps) - KafkaMetricsReporter.startReporters(verifiableProps) - } - - - - val consumerProps = if (options.has(consumerConfigOpt)) - Utils.loadProps(options.valueOf(consumerConfigOpt)) - else - new Properties() - - if(!consumerProps.containsKey("group.id")) { - consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) - groupIdPassed=false - } - consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") - consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - if(!consumerProps.containsKey("dual.commit.enabled")) - consumerProps.put("dual.commit.enabled","false") - if(!consumerProps.containsKey("offsets.storage")) - consumerProps.put("offsets.storage","zookeeper") - - if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) && - checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) { - System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id") - +". Please use --delete-consumer-offsets to delete previous offsets metadata") - System.exit(1) - } - - if(options.has(deleteConsumerOffsetsOpt)) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id")) - - val config = new ConsumerConfig(consumerProps) - val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false - val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) - val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) - val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 - val connector = Consumer.create(config) - - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - connector.shutdown() - // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if(!groupIdPassed) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id")) - } - }) - - var numMessages = 0L - val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] - formatter.init(formatterArgs) - try { - val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) - val iter = if(maxMessages >= 0) - stream.slice(0, maxMessages) - else - stream - - for(messageAndTopic <- iter) { - try { - formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) - numMessages += 1 - } catch { - case e: Throwable => - if (skipMessageOnError) - error("Error processing message, skipping this message: ", e) - else - throw e - } - if(System.out.checkError()) { - // This means no one is listening to our output stream any more, time to shutdown - System.err.println("Unable to write to standard out, closing consumer.") - System.err.println("Consumed %d messages".format(numMessages)) - formatter.close() - connector.shutdown() - System.exit(1) - } - } - } catch { - case e: Throwable => error("Error processing message, stopping consumer: ", e) - } - System.err.println("Consumed %d messages".format(numMessages)) - System.out.flush() - formatter.close() - connector.shutdown() - } - - def tryParse(parser: OptionParser, args: Array[String]) = { - try { - parser.parse(args : _*) - } catch { - case e: OptionException => { - Utils.croak(e.getMessage) - null - } - } - } - - def checkZkPathExists(zkUrl: String, path: String): Boolean = { - try { - val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); - zk.exists(path) - } catch { - case _: Throwable => false - } - } -} - -object MessageFormatter { - def tryParseFormatterArgs(args: Iterable[String]): Properties = { - val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0) - if(!splits.forall(_.length == 2)) { - System.err.println("Invalid parser arguments: " + args.mkString(" ")) - System.exit(1) - } - val props = new Properties - for(a <- splits) - props.put(a(0), a(1)) - props - } -} - -trait MessageFormatter { - def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) - def init(props: Properties) {} - def close() {} -} - -class DefaultMessageFormatter extends MessageFormatter { - var printKey = false - var keySeparator = "\t".getBytes - var lineSeparator = "\n".getBytes - - override def init(props: Properties) { - if(props.containsKey("print.key")) - printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") - if(props.containsKey("key.separator")) - keySeparator = props.getProperty("key.separator").getBytes - if(props.containsKey("line.separator")) - lineSeparator = props.getProperty("line.separator").getBytes - } - - def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { - if(printKey) { - output.write(if (key == null) "null".getBytes() else key) - output.write(keySeparator) - } - output.write(if (value == null) "null".getBytes() else value) - output.write(lineSeparator) - } -} - -class NoOpMessageFormatter extends MessageFormatter { - override def init(props: Properties) {} - def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {} -} - -class ChecksumMessageFormatter extends MessageFormatter { - private var topicStr: String = _ - - override def init(props: Properties) { - topicStr = props.getProperty("topic") - if (topicStr != null) - topicStr = topicStr + ":" - else - topicStr = "" - } - - def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { - val chksum = new Message(value, key).checksum - output.println(topicStr + "checksum:" + chksum) - } -} diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala deleted file mode 100644 index a2af988..0000000 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ /dev/null @@ -1,299 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.producer - -import kafka.common._ -import kafka.message._ -import kafka.serializer._ -import kafka.utils.CommandLineUtils - -import java.util.Properties -import java.io._ - -import joptsimple._ - -object ConsoleProducer { - - def main(args: Array[String]) { - - val config = new ProducerConfig(args) - val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader] - val props = new Properties - props.put("topic", config.topic) - props.putAll(config.cmdLineProps) - reader.init(System.in, props) - - try { - val producer = - if(config.useNewProducer) { - import org.apache.kafka.clients.producer.ProducerConfig - - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) - props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) - props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) - props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) - props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) - if(config.queueEnqueueTimeoutMs != -1) - props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) - props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") - - new NewShinyProducer(props) - } else { - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec) - props.put("producer.type", if(config.sync) "sync" else "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("message.send.max.retries", config.messageSendMaxRetries.toString) - props.put("retry.backoff.ms", config.retryBackoffMs.toString) - props.put("queue.buffering.max.ms", config.sendTimeout.toString) - props.put("queue.buffering.max.messages", config.queueSize.toString) - props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) - props.put("request.required.acks", config.requestRequiredAcks.toString) - props.put("request.timeout.ms", config.requestTimeoutMs.toString) - props.put("key.serializer.class", config.keyEncoderClass) - props.put("serializer.class", config.valueEncoderClass) - props.put("send.buffer.bytes", config.socketBuffer.toString) - props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) - props.put("client.id", "console-producer") - - new OldProducer(props) - } - - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - producer.close() - } - }) - - var message: KeyedMessage[Array[Byte], Array[Byte]] = null - do { - message = reader.readMessage() - if(message != null) - producer.send(message.topic, message.key, message.message) - } while(message != null) - } catch { - case e: Exception => - e.printStackTrace - System.exit(1) - } - System.exit(0) - } - - class ProducerConfig(args: Array[String]) { - val parser = new OptionParser - val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.") - .withRequiredArg - .describedAs("broker-list") - .ofType(classOf[String]) - val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") - val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." + - "If specified without value, than it defaults to 'gzip'") - .withOptionalArg() - .describedAs("compression-codec") - .ofType(classOf[String]) - val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(200) - val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.") - .withRequiredArg - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3) - val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.") - .withRequiredArg - .ofType(classOf[java.lang.Long]) - .defaultsTo(100) - val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + - " a message will queue awaiting suffient batch size. The value is given in ms.") - .withRequiredArg - .describedAs("timeout_ms") - .ofType(classOf[java.lang.Long]) - .defaultsTo(1000) - val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " + - " messages will queue awaiting suffient batch size.") - .withRequiredArg - .describedAs("queue_size") - .ofType(classOf[java.lang.Long]) - .defaultsTo(10000) - val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue") - .withRequiredArg - .describedAs("queue enqueuetimeout ms") - .ofType(classOf[java.lang.Long]) - .defaultsTo(Int.MaxValue) - val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests") - .withRequiredArg - .describedAs("request required acks") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero") - .withRequiredArg - .describedAs("request timeout ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1500) - val metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms", - "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes.") - .withRequiredArg - .describedAs("metadata expiration interval") - .ofType(classOf[java.lang.Long]) - .defaultsTo(5*60*1000L) - val metadataFetchTimeoutMsOpt = parser.accepts("metadata-fetch-timeout-ms", - "The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that topic.") - .withRequiredArg - .describedAs("metadata fetch timeout") - .ofType(classOf[java.lang.Long]) - .defaultsTo(60*1000L) - val maxMemoryBytesOpt = parser.accepts("max-memory-bytes", - "The total memory used by the producer to buffer records waiting to be sent to the server.") - .withRequiredArg - .describedAs("total memory in bytes") - .ofType(classOf[java.lang.Long]) - .defaultsTo(32 * 1024 * 1024L) - val maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes", - "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " + - "will attempt to optimistically group them together until this size is reached.") - .withRequiredArg - .describedAs("memory in bytes per partition") - .ofType(classOf[java.lang.Long]) - .defaultsTo(16 * 1024L) - val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.") - .withRequiredArg - .describedAs("encoder_class") - .ofType(classOf[java.lang.String]) - .defaultsTo(classOf[DefaultEncoder].getName) - val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.") - .withRequiredArg - .describedAs("encoder_class") - .ofType(classOf[java.lang.String]) - .defaultsTo(classOf[DefaultEncoder].getName) - val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " + - "By default each line is read as a separate message.") - .withRequiredArg - .describedAs("reader_class") - .ofType(classOf[java.lang.String]) - .defaultsTo(classOf[LineMessageReader].getName) - val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024*100) - val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + - "This allows custom configuration for a user-defined message reader.") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[String]) - val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") - - val options = parser.parse(args : _*) - for(arg <- List(topicOpt, brokerListOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - import scala.collection.JavaConversions._ - val useNewProducer = options.has(useNewProducerOpt) - val topic = options.valueOf(topicOpt) - val brokerList = options.valueOf(brokerListOpt) - val sync = options.has(syncOpt) - val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) - val compressionCodec = if (options.has(compressionCodecOpt)) - if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty) - DefaultCompressionCodec.name - else compressionCodecOptionValue - else NoCompressionCodec.name - val batchSize = options.valueOf(batchSizeOpt) - val sendTimeout = options.valueOf(sendTimeoutOpt) - val queueSize = options.valueOf(queueSizeOpt) - val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt) - val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt) - val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt) - val messageSendMaxRetries = options.valueOf(messageSendMaxRetriesOpt) - val retryBackoffMs = options.valueOf(retryBackoffMsOpt) - val keyEncoderClass = options.valueOf(keyEncoderOpt) - val valueEncoderClass = options.valueOf(valueEncoderOpt) - val readerClass = options.valueOf(messageReaderOpt) - val socketBuffer = options.valueOf(socketBufferSizeOpt) - val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt)) - /* new producer related configs */ - val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt) - val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt) - val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt) - val metadataFetchTimeoutMs = options.valueOf(metadataFetchTimeoutMsOpt) - } - - trait MessageReader { - def init(inputStream: InputStream, props: Properties) {} - def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] - def close() {} - } - - class LineMessageReader extends MessageReader { - var topic: String = null - var reader: BufferedReader = null - var parseKey = false - var keySeparator = "\t" - var ignoreError = false - var lineNumber = 0 - - override def init(inputStream: InputStream, props: Properties) { - topic = props.getProperty("topic") - if(props.containsKey("parse.key")) - parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true") - if(props.containsKey("key.separator")) - keySeparator = props.getProperty("key.separator") - if(props.containsKey("ignore.error")) - ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true") - reader = new BufferedReader(new InputStreamReader(inputStream)) - } - - override def readMessage() = { - lineNumber += 1 - (reader.readLine(), parseKey) match { - case (null, _) => null - case (line, true) => - line.indexOf(keySeparator) match { - case -1 => - if(ignoreError) - new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) - else - throw new KafkaException("No key found on line " + lineNumber + ": " + line) - case n => - new KeyedMessage[Array[Byte], Array[Byte]](topic, - line.substring(0, n).getBytes, - (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes()) - } - case (line, false) => - new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) - } - } - } -} diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 5417628..0e22897 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -30,7 +30,7 @@ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import scala.Some import kafka.common.TopicAndPartition -import kafka.consumer.MessageFormatter +import kafka.tools.MessageFormatter import java.io.PrintStream import org.apache.kafka.common.protocol.types.{Struct, Schema, Field} import org.apache.kafka.common.protocol.types.Type.STRING @@ -247,7 +247,7 @@ class OffsetManager(val config: OffsetManagerConfig, * Asynchronously read the partition from the offsets topic and populate the cache */ def loadOffsetsFromLog(offsetsPartition: Int) { - + val topicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition) loadingPartitions synchronized { @@ -477,4 +477,3 @@ case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition) } - diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala new file mode 100644 index 0000000..f6bc2f1 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import scala.collection.JavaConversions._ +import org.I0Itec.zkclient._ +import joptsimple._ +import java.util.Properties +import java.util.Random +import java.io.PrintStream +import kafka.message._ +import kafka.serializer._ +import kafka.utils._ +import kafka.metrics.KafkaMetricsReporter +import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer} + +/** + * Consumer that dumps messages out to standard out. + * + */ +object ConsoleConsumer extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") + .withRequiredArg + .describedAs("whitelist") + .ofType(classOf[String]) + val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.") + .withRequiredArg + .describedAs("blacklist") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + + val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") + .withRequiredArg + .describedAs("config file") + .ofType(classOf[String]) + val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") + .withRequiredArg + .describedAs("class") + .ofType(classOf[String]) + .defaultsTo(classOf[DefaultMessageFormatter].getName) + val messageFormatterArgOpt = parser.accepts("property") + .withRequiredArg + .describedAs("prop") + .ofType(classOf[String]) + val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); + val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + + "start with the earliest message present in the log rather than the latest message.") + val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) + val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + + "skip it instead of halt.") + val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") + val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + + "set, the csv metrics will be outputed here") + .withRequiredArg + .describedAs("metrics dictory") + .ofType(classOf[java.lang.String]) + + var groupIdPassed = true + val options: OptionSet = tryParse(parser, args) + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) + if (topicOrFilterOpt.size != 1) { + error("Exactly one of whitelist/blacklist/topic is required.") + parser.printHelpOn(System.err) + System.exit(1) + } + val topicArg = options.valueOf(topicOrFilterOpt.head) + val filterSpec = if (options.has(blacklistOpt)) + new Blacklist(topicArg) + else + new Whitelist(topicArg) + + val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) + if (csvMetricsReporterEnabled) { + val csvReporterProps = new Properties() + csvReporterProps.put("kafka.metrics.polling.interval.secs", "5") + csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") + if (options.has(metricsDirectoryOpt)) + csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) + else + csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics") + csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true") + val verifiableProps = new VerifiableProperties(csvReporterProps) + KafkaMetricsReporter.startReporters(verifiableProps) + } + + + + val consumerProps = if (options.has(consumerConfigOpt)) + Utils.loadProps(options.valueOf(consumerConfigOpt)) + else + new Properties() + + if(!consumerProps.containsKey("group.id")) { + consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) + groupIdPassed=false + } + consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") + consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt)) + if(!consumerProps.containsKey("dual.commit.enabled")) + consumerProps.put("dual.commit.enabled","false") + if(!consumerProps.containsKey("offsets.storage")) + consumerProps.put("offsets.storage","zookeeper") + + if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) && + checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) { + System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id") + +". Please use --delete-consumer-offsets to delete previous offsets metadata") + System.exit(1) + } + + if(options.has(deleteConsumerOffsetsOpt)) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id")) + + val config = new ConsumerConfig(consumerProps) + val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false + val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) + val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) + val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 + val connector = Consumer.create(config) + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + connector.shutdown() + // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack + if(!groupIdPassed) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id")) + } + }) + + var numMessages = 0L + val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] + formatter.init(formatterArgs) + try { + val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) + val iter = if(maxMessages >= 0) + stream.slice(0, maxMessages) + else + stream + + for(messageAndTopic <- iter) { + try { + formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) + numMessages += 1 + } catch { + case e: Throwable => + if (skipMessageOnError) + error("Error processing message, skipping this message: ", e) + else + throw e + } + if(System.out.checkError()) { + // This means no one is listening to our output stream any more, time to shutdown + System.err.println("Unable to write to standard out, closing consumer.") + System.err.println("Consumed %d messages".format(numMessages)) + formatter.close() + connector.shutdown() + System.exit(1) + } + } + } catch { + case e: Throwable => error("Error processing message, stopping consumer: ", e) + } + System.err.println("Consumed %d messages".format(numMessages)) + System.out.flush() + formatter.close() + connector.shutdown() + } + + def tryParse(parser: OptionParser, args: Array[String]) = { + try { + parser.parse(args : _*) + } catch { + case e: OptionException => { + Utils.croak(e.getMessage) + null + } + } + } + + def checkZkPathExists(zkUrl: String, path: String): Boolean = { + try { + val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); + zk.exists(path) + } catch { + case _: Throwable => false + } + } +} + +object MessageFormatter { + def tryParseFormatterArgs(args: Iterable[String]): Properties = { + val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0) + if(!splits.forall(_.length == 2)) { + System.err.println("Invalid parser arguments: " + args.mkString(" ")) + System.exit(1) + } + val props = new Properties + for(a <- splits) + props.put(a(0), a(1)) + props + } +} + +trait MessageFormatter { + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) + def init(props: Properties) {} + def close() {} +} + +class DefaultMessageFormatter extends MessageFormatter { + var printKey = false + var keySeparator = "\t".getBytes + var lineSeparator = "\n".getBytes + + override def init(props: Properties) { + if(props.containsKey("print.key")) + printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") + if(props.containsKey("key.separator")) + keySeparator = props.getProperty("key.separator").getBytes + if(props.containsKey("line.separator")) + lineSeparator = props.getProperty("line.separator").getBytes + } + + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { + if(printKey) { + output.write(if (key == null) "null".getBytes() else key) + output.write(keySeparator) + } + output.write(if (value == null) "null".getBytes() else value) + output.write(lineSeparator) + } +} + +class NoOpMessageFormatter extends MessageFormatter { + override def init(props: Properties) {} + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {} +} + +class ChecksumMessageFormatter extends MessageFormatter { + private var topicStr: String = _ + + override def init(props: Properties) { + topicStr = props.getProperty("topic") + if (topicStr != null) + topicStr = topicStr + ":" + else + topicStr = "" + } + + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { + val chksum = new Message(value, key).checksum + output.println(topicStr + "checksum:" + chksum) + } +} diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala new file mode 100644 index 0000000..f4e07d4 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import kafka.common._ +import kafka.message._ +import kafka.serializer._ +import kafka.utils.CommandLineUtils +import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage} + +import java.util.Properties +import java.io._ + +import joptsimple._ + +object ConsoleProducer { + + def main(args: Array[String]) { + + val config = new ProducerConfig(args) + val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader] + val props = new Properties + props.put("topic", config.topic) + props.putAll(config.cmdLineProps) + reader.init(System.in, props) + + try { + val producer = + if(config.useNewProducer) { + import org.apache.kafka.clients.producer.ProducerConfig + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) + props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) + props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) + props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) + props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) + if(config.queueEnqueueTimeoutMs != -1) + props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) + props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") + + new NewShinyProducer(props) + } else { + props.put("metadata.broker.list", config.brokerList) + props.put("compression.codec", config.compressionCodec) + props.put("producer.type", if(config.sync) "sync" else "async") + props.put("batch.num.messages", config.batchSize.toString) + props.put("message.send.max.retries", config.messageSendMaxRetries.toString) + props.put("retry.backoff.ms", config.retryBackoffMs.toString) + props.put("queue.buffering.max.ms", config.sendTimeout.toString) + props.put("queue.buffering.max.messages", config.queueSize.toString) + props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) + props.put("request.required.acks", config.requestRequiredAcks.toString) + props.put("request.timeout.ms", config.requestTimeoutMs.toString) + props.put("key.serializer.class", config.keyEncoderClass) + props.put("serializer.class", config.valueEncoderClass) + props.put("send.buffer.bytes", config.socketBuffer.toString) + props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) + props.put("client.id", "console-producer") + + new OldProducer(props) + } + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + producer.close() + } + }) + + var message: KeyedMessage[Array[Byte], Array[Byte]] = null + do { + message = reader.readMessage() + if(message != null) + producer.send(message.topic, message.key, message.message) + } while(message != null) + } catch { + case e: Exception => + e.printStackTrace + System.exit(1) + } + System.exit(0) + } + + class ProducerConfig(args: Array[String]) { + val parser = new OptionParser + val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.") + .withRequiredArg + .describedAs("broker-list") + .ofType(classOf[String]) + val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") + val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." + + "If specified without value, than it defaults to 'gzip'") + .withOptionalArg() + .describedAs("compression-codec") + .ofType(classOf[String]) + val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.") + .withRequiredArg + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3) + val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.") + .withRequiredArg + .ofType(classOf[java.lang.Long]) + .defaultsTo(100) + val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + + " a message will queue awaiting suffient batch size. The value is given in ms.") + .withRequiredArg + .describedAs("timeout_ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(1000) + val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " + + " messages will queue awaiting suffient batch size.") + .withRequiredArg + .describedAs("queue_size") + .ofType(classOf[java.lang.Long]) + .defaultsTo(10000) + val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue") + .withRequiredArg + .describedAs("queue enqueuetimeout ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(Int.MaxValue) + val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests") + .withRequiredArg + .describedAs("request required acks") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero") + .withRequiredArg + .describedAs("request timeout ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1500) + val metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms", + "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes.") + .withRequiredArg + .describedAs("metadata expiration interval") + .ofType(classOf[java.lang.Long]) + .defaultsTo(5*60*1000L) + val metadataFetchTimeoutMsOpt = parser.accepts("metadata-fetch-timeout-ms", + "The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that topic.") + .withRequiredArg + .describedAs("metadata fetch timeout") + .ofType(classOf[java.lang.Long]) + .defaultsTo(60*1000L) + val maxMemoryBytesOpt = parser.accepts("max-memory-bytes", + "The total memory used by the producer to buffer records waiting to be sent to the server.") + .withRequiredArg + .describedAs("total memory in bytes") + .ofType(classOf[java.lang.Long]) + .defaultsTo(32 * 1024 * 1024L) + val maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes", + "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " + + "will attempt to optimistically group them together until this size is reached.") + .withRequiredArg + .describedAs("memory in bytes per partition") + .ofType(classOf[java.lang.Long]) + .defaultsTo(16 * 1024L) + val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.") + .withRequiredArg + .describedAs("encoder_class") + .ofType(classOf[java.lang.String]) + .defaultsTo(classOf[DefaultEncoder].getName) + val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.") + .withRequiredArg + .describedAs("encoder_class") + .ofType(classOf[java.lang.String]) + .defaultsTo(classOf[DefaultEncoder].getName) + val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " + + "By default each line is read as a separate message.") + .withRequiredArg + .describedAs("reader_class") + .ofType(classOf[java.lang.String]) + .defaultsTo(classOf[LineMessageReader].getName) + val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024*100) + val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + + "This allows custom configuration for a user-defined message reader.") + .withRequiredArg + .describedAs("prop") + .ofType(classOf[String]) + val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") + + val options = parser.parse(args : _*) + for(arg <- List(topicOpt, brokerListOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + import scala.collection.JavaConversions._ + val useNewProducer = options.has(useNewProducerOpt) + val topic = options.valueOf(topicOpt) + val brokerList = options.valueOf(brokerListOpt) + val sync = options.has(syncOpt) + val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) + val compressionCodec = if (options.has(compressionCodecOpt)) + if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty) + DefaultCompressionCodec.name + else compressionCodecOptionValue + else NoCompressionCodec.name + val batchSize = options.valueOf(batchSizeOpt) + val sendTimeout = options.valueOf(sendTimeoutOpt) + val queueSize = options.valueOf(queueSizeOpt) + val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt) + val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt) + val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt) + val messageSendMaxRetries = options.valueOf(messageSendMaxRetriesOpt) + val retryBackoffMs = options.valueOf(retryBackoffMsOpt) + val keyEncoderClass = options.valueOf(keyEncoderOpt) + val valueEncoderClass = options.valueOf(valueEncoderOpt) + val readerClass = options.valueOf(messageReaderOpt) + val socketBuffer = options.valueOf(socketBufferSizeOpt) + val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt)) + /* new producer related configs */ + val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt) + val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt) + val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt) + val metadataFetchTimeoutMs = options.valueOf(metadataFetchTimeoutMsOpt) + } + + trait MessageReader { + def init(inputStream: InputStream, props: Properties) {} + def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] + def close() {} + } + + class LineMessageReader extends MessageReader { + var topic: String = null + var reader: BufferedReader = null + var parseKey = false + var keySeparator = "\t" + var ignoreError = false + var lineNumber = 0 + + override def init(inputStream: InputStream, props: Properties) { + topic = props.getProperty("topic") + if(props.containsKey("parse.key")) + parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true") + if(props.containsKey("key.separator")) + keySeparator = props.getProperty("key.separator") + if(props.containsKey("ignore.error")) + ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true") + reader = new BufferedReader(new InputStreamReader(inputStream)) + } + + override def readMessage() = { + lineNumber += 1 + (reader.readLine(), parseKey) match { + case (null, _) => null + case (line, true) => + line.indexOf(keySeparator) match { + case -1 => + if(ignoreError) + new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) + else + throw new KafkaException("No key found on line " + lineNumber + ": " + line) + case n => + new KeyedMessage[Array[Byte], Array[Byte]](topic, + line.substring(0, n).getBytes, + (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes()) + } + case (line, false) => + new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) + } + } + } +} diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala new file mode 100644 index 0000000..4688349 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicLong +import java.nio.channels.ClosedByInterruptException +import org.apache.log4j.Logger +import kafka.message.Message +import kafka.utils.ZkUtils +import java.util.{ Random, Properties } +import kafka.consumer._ +import java.text.SimpleDateFormat + +/** + * Performance test for the full zookeeper consumer + */ +object ConsumerPerformance { + private val logger = Logger.getLogger(getClass()) + + def main(args: Array[String]): Unit = { + + val config = new ConsumerPerfConfig(args) + logger.info("Starting consumer...") + var totalMessagesRead = new AtomicLong(0) + var totalBytesRead = new AtomicLong(0) + + if (!config.hideHeader) { + if (!config.showDetailedStats) + println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + else + println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + } + + // clean up zookeeper state for this group id for every perf run + ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId) + + val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig) + + val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads)) + var threadList = List[ConsumerPerfThread]() + for ((topic, streamList) <- topicMessageStreams) + for (i <- 0 until streamList.length) + threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, + totalMessagesRead, totalBytesRead) + + logger.info("Sleeping for 1 second.") + Thread.sleep(1000) + logger.info("starting threads") + val startMs = System.currentTimeMillis + for (thread <- threadList) + thread.start + + for (thread <- threadList) + thread.join + + val endMs = System.currentTimeMillis + val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0 + if (!config.showDetailedStats) { + val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024) + println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs), + config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, + totalMessagesRead.get / elapsedSecs)) + } + System.exit(0) + } + + class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val groupIdOpt = parser.accepts("group", "The group id to consume on.") + .withRequiredArg + .describedAs("gid") + .defaultsTo("perf-consumer-" + new Random().nextInt(100000)) + .ofType(classOf[String]) + val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024 * 1024) + val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + + "offset to consume from, start with the latest message present in the log rather than the earliest message.") + val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(2 * 1024 * 1024) + val numThreadsOpt = parser.accepts("threads", "Number of processing threads.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(10) + val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + + val options = parser.parse(args: _*) + + for (arg <- List(topicOpt, zkConnectOpt)) { + if (!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val props = new Properties + props.put("group.id", options.valueOf(groupIdOpt)) + props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) + props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) + props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest") + props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) + props.put("consumer.timeout.ms", "5000") + props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString) + val consumerConfig = new ConsumerConfig(props) + val numThreads = options.valueOf(numThreadsOpt).intValue + val topic = options.valueOf(topicOpt) + val numMessages = options.valueOf(numMessagesOpt).longValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val showDetailedStats = options.has(showDetailedStatsOpt) + val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) + val hideHeader = options.has(hideHeaderOpt) + } + + class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]], + config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) + extends Thread(name) { + + override def run() { + var bytesRead = 0L + var messagesRead = 0L + val startMs = System.currentTimeMillis + var lastReportTime: Long = startMs + var lastBytesRead = 0L + var lastMessagesRead = 0L + + try { + val iter = stream.iterator + while (iter.hasNext && messagesRead < config.numMessages) { + val messageAndMetadata = iter.next + messagesRead += 1 + bytesRead += messageAndMetadata.message.length + + if (messagesRead % config.reportingInterval == 0) { + if (config.showDetailedStats) + printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis) + lastReportTime = System.currentTimeMillis + lastMessagesRead = messagesRead + lastBytesRead = bytesRead + } + } + } catch { + case _: InterruptedException => + case _: ClosedByInterruptException => + case _: ConsumerTimeoutException => + case e: Throwable => e.printStackTrace() + } + totalMessagesRead.addAndGet(messagesRead) + totalBytesRead.addAndGet(bytesRead) + if (config.showDetailedStats) + printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis) + } + + private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long, + startMs: Long, endMs: Long) = { + val elapsedMs = endMs - startMs + val totalMBRead = (bytesRead * 1.0) / (1024 * 1024) + val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024) + println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id, + config.consumerConfig.fetchMessageMaxBytes, totalMBRead, + 1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0)) + } + } + +} diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala new file mode 100644 index 0000000..129cc01 --- /dev/null +++ b/core/src/main/scala/kafka/tools/PerfConfig.scala @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.tools + +import joptsimple.OptionParser + + +class PerfConfig(args: Array[String]) { + val parser = new OptionParser + val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Long]) + .defaultsTo(Long.MaxValue) + val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(5000) + val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + + "See java.text.SimpleDateFormat for options.") + .withRequiredArg + .describedAs("date format") + .ofType(classOf[String]) + .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS") + val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + + "interval as configured by reporting-interval") + val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") + val messageSizeOpt = parser.accepts("message-size", "The size of each message.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") + .withRequiredArg + .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val helpOpt = parser.accepts("help", "Print usage.") +} diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala new file mode 100644 index 0000000..95cfbc1 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import kafka.metrics.KafkaMetricsReporter +import kafka.producer.{OldProducer, NewShinyProducer} +import kafka.utils.{VerifiableProperties, Logging} +import kafka.message.CompressionCodec +import kafka.serializer._ + +import java.util.concurrent.{CountDownLatch, Executors} +import java.util.concurrent.atomic.AtomicLong +import java.util._ +import java.text.SimpleDateFormat +import java.math.BigInteger +import scala.collection.immutable.List + +import org.apache.log4j.Logger + +/** + * Load test for the producer + */ +object ProducerPerformance extends Logging { + + def main(args: Array[String]) { + + val logger = Logger.getLogger(getClass) + val config = new ProducerPerfConfig(args) + if (!config.isFixedSize) + logger.info("WARN: Throughput will be slower due to changing message size per request") + + val totalBytesSent = new AtomicLong(0) + val totalMessagesSent = new AtomicLong(0) + val executor = Executors.newFixedThreadPool(config.numThreads) + val allDone = new CountDownLatch(config.numThreads) + val startMs = System.currentTimeMillis + val rand = new java.util.Random + + if (!config.hideHeader) + println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " + + "total.data.sent.in.nMsg, nMsg.sec") + + for (i <- 0 until config.numThreads) { + executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand)) + } + + allDone.await() + val endMs = System.currentTimeMillis + val elapsedSecs = (endMs - startMs) / 1000.0 + val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024) + println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format( + config.dateFormat.format(startMs), config.dateFormat.format(endMs), + config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent, + totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs)) + System.exit(0) + } + + class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) { + val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.") + .withRequiredArg + .describedAs("hostname:port,..,hostname:port") + .ofType(classOf[String]) + val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to") + .withRequiredArg + .describedAs("topic1,topic2..") + .ofType(classOf[String]) + val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3000) + val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3) + val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + + "to complete") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(-1) + val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") + val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.") + val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") + .withRequiredArg + .describedAs("number of threads") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " + + "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + + "in the form of 'Message:000...1:xxx...'") + .withRequiredArg() + .describedAs("initial message id") + .ofType(classOf[java.lang.Integer]) + val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends") + .withRequiredArg() + .describedAs("message send time gap") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") + val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + + "set, the csv metrics will be outputed here") + .withRequiredArg + .describedAs("metrics dictory") + .ofType(classOf[java.lang.String]) + val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") + + val options = parser.parse(args: _*) + for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) { + if (!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + val topicsStr = options.valueOf(topicsOpt) + val topics = topicsStr.split(",") + val numMessages = options.valueOf(numMessagesOpt).longValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) + val hideHeader = options.has(hideHeaderOpt) + val brokerList = options.valueOf(brokerListOpt) + val messageSize = options.valueOf(messageSizeOpt).intValue + var isFixedSize = !options.has(varyMessageSizeOpt) + var isSync = options.has(syncOpt) + var batchSize = options.valueOf(batchSizeOpt).intValue + var numThreads = options.valueOf(numThreadsOpt).intValue + val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) + val seqIdMode = options.has(initialMessageIdOpt) + var initialMessageId: Int = 0 + if (seqIdMode) + initialMessageId = options.valueOf(initialMessageIdOpt).intValue() + val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() + val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue() + val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue() + val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue() + val useNewProducer = options.has(useNewProducerOpt) + + val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) + + if (csvMetricsReporterEnabled) { + val props = new Properties() + props.put("kafka.metrics.polling.interval.secs", "1") + props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") + if (options.has(metricsDirectoryOpt)) + props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) + else + props.put("kafka.csv.metrics.dir", "kafka_metrics") + props.put("kafka.csv.metrics.reporter.enabled", "true") + val verifiableProps = new VerifiableProperties(props) + KafkaMetricsReporter.startReporters(verifiableProps) + } + + val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue() + } + + class ProducerThread(val threadId: Int, + val config: ProducerPerfConfig, + val totalBytesSent: AtomicLong, + val totalMessagesSent: AtomicLong, + val allDone: CountDownLatch, + val rand: Random) extends Runnable { + val seqIdNumDigit = 10 // no. of digits for max int value + + val messagesPerThread = config.numMessages / config.numThreads + debug("Messages per thread = " + messagesPerThread) + val props = new Properties() + val producer = + if (config.useNewProducer) { + import org.apache.kafka.clients.producer.ProducerConfig + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance") + props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) + new NewShinyProducer(props) + } else { + props.put("metadata.broker.list", config.brokerList) + props.put("compression.codec", config.compressionCodec.codec.toString) + props.put("send.buffer.bytes", (64 * 1024).toString) + if (!config.isSync) { + props.put("producer.type", "async") + props.put("batch.num.messages", config.batchSize.toString) + props.put("queue.enqueue.timeout.ms", "-1") + } + props.put("client.id", "producer-performance") + props.put("request.required.acks", config.producerRequestRequiredAcks.toString) + props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) + props.put("message.send.max.retries", config.producerNumRetries.toString) + props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) + props.put("serializer.class", classOf[DefaultEncoder].getName) + props.put("key.serializer.class", classOf[NullEncoder[Long]].getName) + new OldProducer(props) + } + + // generate the sequential message ID + private val SEP = ":" // message field separator + private val messageIdLabel = "MessageID" + private val threadIdLabel = "ThreadID" + private val topicLabel = "Topic" + private var leftPaddedSeqId: String = "" + + private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = { + // Each thread gets a unique range of sequential no. for its ids. + // Eg. 1000 msg in 10 threads => 100 msg per thread + // thread 0 IDs : 0 ~ 99 + // thread 1 IDs : 100 ~ 199 + // thread 2 IDs : 200 ~ 299 + // . . . + leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId)) + + val msgHeader = topicLabel + SEP + + topic + SEP + + threadIdLabel + SEP + + threadId + SEP + + messageIdLabel + SEP + + leftPaddedSeqId + SEP + + val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x') + debug(seqMsgString) + return seqMsgString.getBytes() + } + + private def generateProducerData(topic: String, messageId: Long): Array[Byte] = { + val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize) + if (config.seqIdMode) { + val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId + generateMessageWithSeqId(topic, seqId, msgSize) + } else { + new Array[Byte](msgSize) + } + } + + override def run { + var bytesSent = 0L + var nSends = 0 + var i: Long = 0L + var message: Array[Byte] = null + + while (i < messagesPerThread) { + try { + config.topics.foreach( + topic => { + message = generateProducerData(topic, i) + producer.send(topic, BigInteger.valueOf(i).toByteArray, message) + bytesSent += message.size + nSends += 1 + if (config.messageSendGapMs > 0) + Thread.sleep(config.messageSendGapMs) + }) + } catch { + case e: Throwable => error("Error when sending message " + new String(message), e) + } + i += 1 + } + try { + producer.close() + } catch { + case e: Throwable => error("Error when closing producer", e) + } + totalBytesSent.addAndGet(bytesSent) + totalMessagesSent.addAndGet(nSends) + allDone.countDown() + } + } +} diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala new file mode 100644 index 0000000..8b8c472 --- /dev/null +++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.net.URI +import java.text.SimpleDateFormat +import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} +import kafka.consumer.SimpleConsumer +import kafka.utils._ +import org.apache.log4j.Logger +import kafka.common.TopicAndPartition + + +/** + * Performance test for the simple consumer + */ +object SimpleConsumerPerformance { + + def main(args: Array[String]) { + val logger = Logger.getLogger(getClass) + val config = new ConsumerPerfConfig(args) + + if(!config.hideHeader) { + if(!config.showDetailedStats) + println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + else + println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + } + + val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId) + + // reset to latest or smallest offset + val topicAndPartition = TopicAndPartition(config.topic, config.partition) + val request = OffsetRequest(Map( + topicAndPartition -> PartitionOffsetRequestInfo(if (config.fromLatest) OffsetRequest.LatestTime else OffsetRequest.EarliestTime, 1) + )) + var offset: Long = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + + val startMs = System.currentTimeMillis + var done = false + var totalBytesRead = 0L + var totalMessagesRead = 0L + var consumedInterval = 0 + var lastReportTime: Long = startMs + var lastBytesRead = 0L + var lastMessagesRead = 0L + while(!done) { + // TODO: add in the maxWait and minBytes for performance + val request = new FetchRequestBuilder() + .clientId(config.clientId) + .addFetch(config.topic, config.partition, offset, config.fetchSize) + .build() + val fetchResponse = consumer.fetch(request) + + var messagesRead = 0 + var bytesRead = 0 + val messageSet = fetchResponse.messageSet(config.topic, config.partition) + for (message <- messageSet) { + messagesRead += 1 + bytesRead += message.message.payloadSize + } + + if(messagesRead == 0 || totalMessagesRead > config.numMessages) + done = true + else + // we only did one fetch so we find the offset for the first (head) messageset + offset += messageSet.validBytes + + totalBytesRead += bytesRead + totalMessagesRead += messagesRead + consumedInterval += messagesRead + + if(consumedInterval > config.reportingInterval) { + if(config.showDetailedStats) { + val reportTime = System.currentTimeMillis + val elapsed = (reportTime - lastReportTime)/1000.0 + val totalMBRead = ((totalBytesRead-lastBytesRead)*1.0)/(1024*1024) + println(("%s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(reportTime), config.fetchSize, + (totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed, + totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed)) + } + lastReportTime = SystemTime.milliseconds + lastBytesRead = totalBytesRead + lastMessagesRead = totalMessagesRead + consumedInterval = 0 + } + } + val reportTime = System.currentTimeMillis + val elapsed = (reportTime - startMs) / 1000.0 + + if(!config.showDetailedStats) { + val totalMBRead = (totalBytesRead*1.0)/(1024*1024) + println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), + config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed, + totalMessagesRead, totalMessagesRead/elapsed)) + } + System.exit(0) + } + + class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { + val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") + .withRequiredArg + .describedAs("kafka://hostname:port") + .ofType(classOf[String]) + val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + + "offset to consume from, start with the latest message present in the log rather than the earliest message.") + val partitionOpt = parser.accepts("partition", "The topic partition to consume from.") + .withRequiredArg + .describedAs("partition") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.") + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024*1024) + val clientIdOpt = parser.accepts("clientId", "The ID of this client.") + .withRequiredArg + .describedAs("clientId") + .ofType(classOf[String]) + .defaultsTo("SimpleConsumerPerformanceClient") + + val options = parser.parse(args : _*) + + for(arg <- List(topicOpt, urlOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + val url = new URI(options.valueOf(urlOpt)) + val fetchSize = options.valueOf(fetchSizeOpt).intValue + val fromLatest = options.has(resetBeginningOffsetOpt) + val partition = options.valueOf(partitionOpt).intValue + val topic = options.valueOf(topicOpt) + val numMessages = options.valueOf(numMessagesOpt).longValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val showDetailedStats = options.has(showDetailedStatsOpt) + val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) + val hideHeader = options.has(hideHeaderOpt) + val clientId = options.valueOf(clientIdOpt).toString + } +} diff --git a/perf/config/log4j.properties b/perf/config/log4j.properties deleted file mode 100644 index 542b739..0000000 --- a/perf/config/log4j.properties +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -log4j.rootLogger=INFO, fileAppender - -log4j.appender.fileAppender=org.apache.log4j.FileAppender -log4j.appender.fileAppender.File=perf.log -log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.fileAppender.layout.ConversionPattern=%m %n - -# Turn on all our debugging info -log4j.logger.kafka=INFO - diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala deleted file mode 100644 index 4dde468..0000000 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.perf - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicLong -import java.nio.channels.ClosedByInterruptException -import org.apache.log4j.Logger -import kafka.message.Message -import kafka.utils.ZkUtils -import java.util.{ Random, Properties } -import kafka.consumer._ -import java.text.SimpleDateFormat - -/** - * Performance test for the full zookeeper consumer - */ -object ConsumerPerformance { - private val logger = Logger.getLogger(getClass()) - - def main(args: Array[String]): Unit = { - - val config = new ConsumerPerfConfig(args) - logger.info("Starting consumer...") - var totalMessagesRead = new AtomicLong(0) - var totalBytesRead = new AtomicLong(0) - - if (!config.hideHeader) { - if (!config.showDetailedStats) - println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - else - println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - } - - // clean up zookeeper state for this group id for every perf run - ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId) - - val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig) - - val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads)) - var threadList = List[ConsumerPerfThread]() - for ((topic, streamList) <- topicMessageStreams) - for (i <- 0 until streamList.length) - threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, - totalMessagesRead, totalBytesRead) - - logger.info("Sleeping for 1 second.") - Thread.sleep(1000) - logger.info("starting threads") - val startMs = System.currentTimeMillis - for (thread <- threadList) - thread.start - - for (thread <- threadList) - thread.join - - val endMs = System.currentTimeMillis - val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0 - if (!config.showDetailedStats) { - val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024) - println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs), - config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, - totalMessagesRead.get / elapsedSecs)) - } - System.exit(0) - } - - class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val groupIdOpt = parser.accepts("group", "The group id to consume on.") - .withRequiredArg - .describedAs("gid") - .defaultsTo("perf-consumer-" + new Random().nextInt(100000)) - .ofType(classOf[String]) - val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024 * 1024) - val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + - "offset to consume from, start with the latest message present in the log rather than the earliest message.") - val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(2 * 1024 * 1024) - val numThreadsOpt = parser.accepts("threads", "Number of processing threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(10) - val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - - val options = parser.parse(args: _*) - - for (arg <- List(topicOpt, zkConnectOpt)) { - if (!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - val props = new Properties - props.put("group.id", options.valueOf(groupIdOpt)) - props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) - props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) - props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest") - props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - props.put("consumer.timeout.ms", "5000") - props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString) - val consumerConfig = new ConsumerConfig(props) - val numThreads = options.valueOf(numThreadsOpt).intValue - val topic = options.valueOf(topicOpt) - val numMessages = options.valueOf(numMessagesOpt).longValue - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - val showDetailedStats = options.has(showDetailedStatsOpt) - val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) - val hideHeader = options.has(hideHeaderOpt) - } - - class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]], - config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) - extends Thread(name) { - - override def run() { - var bytesRead = 0L - var messagesRead = 0L - val startMs = System.currentTimeMillis - var lastReportTime: Long = startMs - var lastBytesRead = 0L - var lastMessagesRead = 0L - - try { - val iter = stream.iterator - while (iter.hasNext && messagesRead < config.numMessages) { - val messageAndMetadata = iter.next - messagesRead += 1 - bytesRead += messageAndMetadata.message.length - - if (messagesRead % config.reportingInterval == 0) { - if (config.showDetailedStats) - printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis) - lastReportTime = System.currentTimeMillis - lastMessagesRead = messagesRead - lastBytesRead = bytesRead - } - } - } catch { - case _: InterruptedException => - case _: ClosedByInterruptException => - case _: ConsumerTimeoutException => - case e: Throwable => e.printStackTrace() - } - totalMessagesRead.addAndGet(messagesRead) - totalBytesRead.addAndGet(bytesRead) - if (config.showDetailedStats) - printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis) - } - - private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long, - startMs: Long, endMs: Long) = { - val elapsedMs = endMs - startMs - val totalMBRead = (bytesRead * 1.0) / (1024 * 1024) - val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024) - println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id, - config.consumerConfig.fetchMessageMaxBytes, totalMBRead, - 1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0)) - } - } - -} diff --git a/perf/src/main/scala/kafka/perf/PerfConfig.scala b/perf/src/main/scala/kafka/perf/PerfConfig.scala deleted file mode 100644 index a8fc6b9..0000000 --- a/perf/src/main/scala/kafka/perf/PerfConfig.scala +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.perf - -import joptsimple.OptionParser - - -class PerfConfig(args: Array[String]) { - val parser = new OptionParser - val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Long]) - .defaultsTo(Long.MaxValue) - val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(5000) - val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + - "See java.text.SimpleDateFormat for options.") - .withRequiredArg - .describedAs("date format") - .ofType(classOf[String]) - .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS") - val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + - "interval as configured by reporting-interval") - val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") - val messageSizeOpt = parser.accepts("message-size", "The size of each message.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) - val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(200) - val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") - .withRequiredArg - .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - val helpOpt = parser.accepts("help", "Print usage.") -} diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala deleted file mode 100644 index 00fa90b..0000000 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.perf - -import kafka.metrics.KafkaMetricsReporter -import kafka.producer.{OldProducer, NewShinyProducer} -import kafka.utils.{VerifiableProperties, Logging} -import kafka.message.CompressionCodec -import kafka.serializer._ - -import java.util.concurrent.{CountDownLatch, Executors} -import java.util.concurrent.atomic.AtomicLong -import java.util._ -import java.text.SimpleDateFormat -import java.math.BigInteger -import scala.collection.immutable.List - -import org.apache.log4j.Logger - -/** - * Load test for the producer - */ -object ProducerPerformance extends Logging { - - def main(args: Array[String]) { - - val logger = Logger.getLogger(getClass) - val config = new ProducerPerfConfig(args) - if (!config.isFixedSize) - logger.info("WARN: Throughput will be slower due to changing message size per request") - - val totalBytesSent = new AtomicLong(0) - val totalMessagesSent = new AtomicLong(0) - val executor = Executors.newFixedThreadPool(config.numThreads) - val allDone = new CountDownLatch(config.numThreads) - val startMs = System.currentTimeMillis - val rand = new java.util.Random - - if (!config.hideHeader) - println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " + - "total.data.sent.in.nMsg, nMsg.sec") - - for (i <- 0 until config.numThreads) { - executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand)) - } - - allDone.await() - val endMs = System.currentTimeMillis - val elapsedSecs = (endMs - startMs) / 1000.0 - val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024) - println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format( - config.dateFormat.format(startMs), config.dateFormat.format(endMs), - config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent, - totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs)) - System.exit(0) - } - - class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.") - .withRequiredArg - .describedAs("hostname:port,..,hostname:port") - .ofType(classOf[String]) - val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to") - .withRequiredArg - .describedAs("topic1,topic2..") - .ofType(classOf[String]) - val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3000) - val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3) - val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) - val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + - "to complete") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(-1) - val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") - val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.") - val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") - .withRequiredArg - .describedAs("number of threads") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " + - "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + - "in the form of 'Message:000...1:xxx...'") - .withRequiredArg() - .describedAs("initial message id") - .ofType(classOf[java.lang.Integer]) - val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends") - .withRequiredArg() - .describedAs("message send time gap") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") - val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") - .withRequiredArg - .describedAs("metrics dictory") - .ofType(classOf[java.lang.String]) - val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") - - val options = parser.parse(args: _*) - for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) { - if (!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - val topicsStr = options.valueOf(topicsOpt) - val topics = topicsStr.split(",") - val numMessages = options.valueOf(numMessagesOpt).longValue - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) - val hideHeader = options.has(hideHeaderOpt) - val brokerList = options.valueOf(brokerListOpt) - val messageSize = options.valueOf(messageSizeOpt).intValue - var isFixedSize = !options.has(varyMessageSizeOpt) - var isSync = options.has(syncOpt) - var batchSize = options.valueOf(batchSizeOpt).intValue - var numThreads = options.valueOf(numThreadsOpt).intValue - val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) - val seqIdMode = options.has(initialMessageIdOpt) - var initialMessageId: Int = 0 - if (seqIdMode) - initialMessageId = options.valueOf(initialMessageIdOpt).intValue() - val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() - val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue() - val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue() - val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue() - val useNewProducer = options.has(useNewProducerOpt) - - val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) - - if (csvMetricsReporterEnabled) { - val props = new Properties() - props.put("kafka.metrics.polling.interval.secs", "1") - props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") - if (options.has(metricsDirectoryOpt)) - props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) - else - props.put("kafka.csv.metrics.dir", "kafka_metrics") - props.put("kafka.csv.metrics.reporter.enabled", "true") - val verifiableProps = new VerifiableProperties(props) - KafkaMetricsReporter.startReporters(verifiableProps) - } - - val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue() - } - - class ProducerThread(val threadId: Int, - val config: ProducerPerfConfig, - val totalBytesSent: AtomicLong, - val totalMessagesSent: AtomicLong, - val allDone: CountDownLatch, - val rand: Random) extends Runnable { - val seqIdNumDigit = 10 // no. of digits for max int value - - val messagesPerThread = config.numMessages / config.numThreads - debug("Messages per thread = " + messagesPerThread) - val props = new Properties() - val producer = - if (config.useNewProducer) { - import org.apache.kafka.clients.producer.ProducerConfig - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance") - props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) - new NewShinyProducer(props) - } else { - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec.codec.toString) - props.put("send.buffer.bytes", (64 * 1024).toString) - if (!config.isSync) { - props.put("producer.type", "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("queue.enqueue.timeout.ms", "-1") - } - props.put("client.id", "producer-performance") - props.put("request.required.acks", config.producerRequestRequiredAcks.toString) - props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) - props.put("message.send.max.retries", config.producerNumRetries.toString) - props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) - props.put("serializer.class", classOf[DefaultEncoder].getName) - props.put("key.serializer.class", classOf[NullEncoder[Long]].getName) - new OldProducer(props) - } - - // generate the sequential message ID - private val SEP = ":" // message field separator - private val messageIdLabel = "MessageID" - private val threadIdLabel = "ThreadID" - private val topicLabel = "Topic" - private var leftPaddedSeqId: String = "" - - private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = { - // Each thread gets a unique range of sequential no. for its ids. - // Eg. 1000 msg in 10 threads => 100 msg per thread - // thread 0 IDs : 0 ~ 99 - // thread 1 IDs : 100 ~ 199 - // thread 2 IDs : 200 ~ 299 - // . . . - leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId)) - - val msgHeader = topicLabel + SEP + - topic + SEP + - threadIdLabel + SEP + - threadId + SEP + - messageIdLabel + SEP + - leftPaddedSeqId + SEP - - val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x') - debug(seqMsgString) - return seqMsgString.getBytes() - } - - private def generateProducerData(topic: String, messageId: Long): Array[Byte] = { - val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize) - if (config.seqIdMode) { - val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId - generateMessageWithSeqId(topic, seqId, msgSize) - } else { - new Array[Byte](msgSize) - } - } - - override def run { - var bytesSent = 0L - var nSends = 0 - var i: Long = 0L - var message: Array[Byte] = null - - while (i < messagesPerThread) { - try { - config.topics.foreach( - topic => { - message = generateProducerData(topic, i) - producer.send(topic, BigInteger.valueOf(i).toByteArray, message) - bytesSent += message.size - nSends += 1 - if (config.messageSendGapMs > 0) - Thread.sleep(config.messageSendGapMs) - }) - } catch { - case e: Throwable => error("Error when sending message " + new String(message), e) - } - i += 1 - } - try { - producer.close() - } catch { - case e: Throwable => error("Error when closing producer", e) - } - totalBytesSent.addAndGet(bytesSent) - totalMessagesSent.addAndGet(nSends) - allDone.countDown() - } - } -} diff --git a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala deleted file mode 100644 index c52ada0..0000000 --- a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.perf - -import java.net.URI -import java.text.SimpleDateFormat -import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} -import kafka.consumer.SimpleConsumer -import kafka.utils._ -import org.apache.log4j.Logger -import kafka.common.TopicAndPartition - - -/** - * Performance test for the simple consumer - */ -object SimpleConsumerPerformance { - - def main(args: Array[String]) { - val logger = Logger.getLogger(getClass) - val config = new ConsumerPerfConfig(args) - - if(!config.hideHeader) { - if(!config.showDetailedStats) - println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - else - println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - } - - val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId) - - // reset to latest or smallest offset - val topicAndPartition = TopicAndPartition(config.topic, config.partition) - val request = OffsetRequest(Map( - topicAndPartition -> PartitionOffsetRequestInfo(if (config.fromLatest) OffsetRequest.LatestTime else OffsetRequest.EarliestTime, 1) - )) - var offset: Long = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head - - val startMs = System.currentTimeMillis - var done = false - var totalBytesRead = 0L - var totalMessagesRead = 0L - var consumedInterval = 0 - var lastReportTime: Long = startMs - var lastBytesRead = 0L - var lastMessagesRead = 0L - while(!done) { - // TODO: add in the maxWait and minBytes for performance - val request = new FetchRequestBuilder() - .clientId(config.clientId) - .addFetch(config.topic, config.partition, offset, config.fetchSize) - .build() - val fetchResponse = consumer.fetch(request) - - var messagesRead = 0 - var bytesRead = 0 - val messageSet = fetchResponse.messageSet(config.topic, config.partition) - for (message <- messageSet) { - messagesRead += 1 - bytesRead += message.message.payloadSize - } - - if(messagesRead == 0 || totalMessagesRead > config.numMessages) - done = true - else - // we only did one fetch so we find the offset for the first (head) messageset - offset += messageSet.validBytes - - totalBytesRead += bytesRead - totalMessagesRead += messagesRead - consumedInterval += messagesRead - - if(consumedInterval > config.reportingInterval) { - if(config.showDetailedStats) { - val reportTime = System.currentTimeMillis - val elapsed = (reportTime - lastReportTime)/1000.0 - val totalMBRead = ((totalBytesRead-lastBytesRead)*1.0)/(1024*1024) - println(("%s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(reportTime), config.fetchSize, - (totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed, - totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed)) - } - lastReportTime = SystemTime.milliseconds - lastBytesRead = totalBytesRead - lastMessagesRead = totalMessagesRead - consumedInterval = 0 - } - } - val reportTime = System.currentTimeMillis - val elapsed = (reportTime - startMs) / 1000.0 - - if(!config.showDetailedStats) { - val totalMBRead = (totalBytesRead*1.0)/(1024*1024) - println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), - config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed, - totalMessagesRead, totalMessagesRead/elapsed)) - } - System.exit(0) - } - - class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") - .withRequiredArg - .describedAs("kafka://hostname:port") - .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + - "offset to consume from, start with the latest message present in the log rather than the earliest message.") - val partitionOpt = parser.accepts("partition", "The topic partition to consume from.") - .withRequiredArg - .describedAs("partition") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.") - .withRequiredArg - .describedAs("bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024*1024) - val clientIdOpt = parser.accepts("clientId", "The ID of this client.") - .withRequiredArg - .describedAs("clientId") - .ofType(classOf[String]) - .defaultsTo("SimpleConsumerPerformanceClient") - - val options = parser.parse(args : _*) - - for(arg <- List(topicOpt, urlOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - val url = new URI(options.valueOf(urlOpt)) - val fetchSize = options.valueOf(fetchSizeOpt).intValue - val fromLatest = options.has(resetBeginningOffsetOpt) - val partition = options.valueOf(partitionOpt).intValue - val topic = options.valueOf(topicOpt) - val numMessages = options.valueOf(numMessagesOpt).longValue - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - val showDetailedStats = options.has(showDetailedStatsOpt) - val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) - val hideHeader = options.has(hideHeaderOpt) - val clientId = options.valueOf(clientIdOpt).toString - } -} diff --git a/system_test/broker_failure/bin/run-test.sh b/system_test/broker_failure/bin/run-test.sh index 1f11180..549cd1f 100755 --- a/system_test/broker_failure/bin/run-test.sh +++ b/system_test/broker_failure/bin/run-test.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,7 +17,7 @@ # =========== # run-test.sh # =========== - + # ==================================== # Do not change the followings # (keep this section at the beginning @@ -52,9 +52,9 @@ readonly source_console_consumer_grp=source readonly target_console_consumer_grp=target readonly message_size=100 readonly console_consumer_timeout_ms=15000 -readonly num_kafka_source_server=4 # requires same no. of property files such as: +readonly num_kafka_source_server=4 # requires same no. of property files such as: # $base_dir/config/server_source{1..4}.properties -readonly num_kafka_target_server=3 # requires same no. of property files such as: +readonly num_kafka_target_server=3 # requires same no. of property files such as: # $base_dir/config/server_target{1..3}.properties readonly num_kafka_mirror_maker=3 # any values greater than 0 readonly wait_time_after_killing_broker=0 # wait after broker is stopped but before starting again @@ -65,8 +65,8 @@ readonly wait_time_after_restarting_broker=10 # ==================================== num_msg_per_batch=500 # no. of msg produced in each calling of ProducerPerformance num_producer_threads=5 # no. of producer threads to send msg -producer_sleep_min=5 # min & max sleep time (in sec) between each -producer_sleep_max=5 # batch of messages sent from producer +producer_sleep_min=5 # min & max sleep time (in sec) between each +producer_sleep_max=5 # batch of messages sent from producer # ==================================== # zookeeper @@ -255,7 +255,7 @@ create_topic() { --topic $this_topic_to_create \ --zookeeper $this_zk_conn_str \ --replica $this_replica_factor \ - 2> $kafka_topic_creation_log_file + 2> $kafka_topic_creation_log_file } # ========================================= @@ -281,7 +281,7 @@ start_zk() { start_source_servers_cluster() { info "starting source cluster" - for ((i=1; i<=$num_kafka_source_server; i++)) + for ((i=1; i<=$num_kafka_source_server; i++)) do start_source_server $i done @@ -367,13 +367,13 @@ start_console_consumer() { info "starting console consumers for $this_consumer_grp" - $base_dir/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer \ + $base_dir/bin/kafka-run-class.sh kafka.tools.ConsoleConsumer \ --zookeeper localhost:$this_consumer_zk_port \ --topic $test_topic \ --group $this_consumer_grp \ --from-beginning \ --consumer-timeout-ms $console_consumer_timeout_ms \ - --formatter "kafka.consumer.ConsoleConsumer\$${this_msg_formatter}" \ + --formatter "kafka.tools.ConsoleConsumer\$${this_msg_formatter}" \ 2>&1 > ${this_consumer_log} & console_consumer_pid=$! @@ -448,7 +448,7 @@ start_background_producer() { info "producing $num_msg_per_batch messages on topic '$topic'" $base_dir/bin/kafka-run-class.sh \ - kafka.perf.ProducerPerformance \ + kafka.tools.ProducerPerformance \ --brokerinfo zk.connect=localhost:2181 \ --topics $topic \ --messages $num_msg_per_batch \ @@ -499,7 +499,7 @@ cmp_checksum() { crc_only_in_producer=`comm -23 $producer_performance_mid_sorted_uniq_log $console_consumer_source_mid_sorted_uniq_log` - duplicate_mirror_mid=`comm -23 $console_consumer_target_mid_sorted_log $console_consumer_target_mid_sorted_uniq_log` + duplicate_mirror_mid=`comm -23 $console_consumer_target_mid_sorted_log $console_consumer_target_mid_sorted_uniq_log` no_of_duplicate_msg=$(( $msg_count_from_mirror_consumer - $uniq_msg_count_from_mirror_consumer \ + $msg_count_from_source_consumer - $uniq_msg_count_from_source_consumer - \ 2*$duplicate_msg_in_producer )) @@ -521,19 +521,19 @@ cmp_checksum() { echo "" echo "========================================================" >> $checksum_diff_log - echo "crc only in producer" >> $checksum_diff_log + echo "crc only in producer" >> $checksum_diff_log echo "========================================================" >> $checksum_diff_log - echo "${crc_only_in_producer}" >> $checksum_diff_log + echo "${crc_only_in_producer}" >> $checksum_diff_log echo "" >> $checksum_diff_log echo "========================================================" >> $checksum_diff_log - echo "crc only in source consumer" >> $checksum_diff_log + echo "crc only in source consumer" >> $checksum_diff_log echo "========================================================" >> $checksum_diff_log - echo "${crc_only_in_source_consumer}" >> $checksum_diff_log + echo "${crc_only_in_source_consumer}" >> $checksum_diff_log echo "" >> $checksum_diff_log echo "========================================================" >> $checksum_diff_log echo "crc only in mirror consumer" >> $checksum_diff_log echo "========================================================" >> $checksum_diff_log - echo "${crc_only_in_mirror_consumer}" >> $checksum_diff_log + echo "${crc_only_in_mirror_consumer}" >> $checksum_diff_log echo "" >> $checksum_diff_log echo "========================================================" >> $checksum_diff_log echo "duplicate crc in mirror consumer" >> $checksum_diff_log @@ -583,8 +583,8 @@ start_test() { info "Started background producer pid [${background_producer_pid}]" sleep 5 - - # loop for no. of iterations specified in $num_iterations + + # loop for no. of iterations specified in $num_iterations while [ $num_iterations -ge $iter ] do # if $svr_to_bounce is '0', it means no bouncing diff --git a/system_test/producer_perf/bin/run-compression-test.sh b/system_test/producer_perf/bin/run-compression-test.sh index ea20f0d..5297d1f 100755 --- a/system_test/producer_perf/bin/run-compression-test.sh +++ b/system_test/producer_perf/bin/run-compression-test.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,9 +28,9 @@ $base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>& sleep 4 echo "start producing $num_messages messages ..." -$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topics test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --compression-codec 1 +$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topics test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async --compression-codec 1 -echo "wait for data to be persisted" +echo "wait for data to be persisted" cur_offset="-1" quit=0 while [ $quit -eq 0 ] @@ -59,4 +59,3 @@ fi ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null sleep 2 ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null - diff --git a/system_test/producer_perf/bin/run-test.sh b/system_test/producer_perf/bin/run-test.sh index bb60817..9a3b885 100755 --- a/system_test/producer_perf/bin/run-test.sh +++ b/system_test/producer_perf/bin/run-test.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,9 +28,9 @@ $base_dir/../../bin/kafka-server-start.sh $base_dir/config/server.properties 2>& sleep 4 echo "start producing $num_messages messages ..." -$base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topics test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async +$base_dir/../../bin/kafka-run-class.sh kafka.tools.ProducerPerformance --brokerinfo broker.list=0:localhost:9092 --topics test01 --messages $num_messages --message-size $message_size --batch-size 200 --threads 1 --reporting-interval 100000 num_messages --async -echo "wait for data to be persisted" +echo "wait for data to be persisted" cur_offset="-1" quit=0 while [ $quit -eq 0 ] @@ -59,4 +59,3 @@ fi ps ax | grep -i 'kafka.kafka' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null sleep 2 ps ax | grep -i 'QuorumPeerMain' | grep -v grep | awk '{print $1}' | xargs kill -15 > /dev/null - diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py index de02e47..0947bf2 100644 --- a/system_test/utils/kafka_system_test_utils.py +++ b/system_test/utils/kafka_system_test_utils.py @@ -117,7 +117,7 @@ def generate_testcase_log_dirs(systemTestEnv, testcaseEnv): # create the role directory under dashboards dashboardsRoleDir = dashboardsPathName + "/" + role if not os.path.exists(dashboardsRoleDir) : os.makedirs(dashboardsRoleDir) - + def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): anonLogger.info("================================================") @@ -212,7 +212,7 @@ def collect_logs_from_remote_hosts(systemTestEnv, testcaseEnv): logger.debug("executing command [" + cmdStr + "]", extra=d) system_test_utils.sys_call(cmdStr) - + def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv): testCaseBaseDir = testcaseEnv.testCaseBaseDir @@ -432,9 +432,9 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv sys.exit(1) addedCSVConfig = {} - addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", clusterCfg["entity_id"], "metrics") - addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" - addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" + addedCSVConfig["kafka.csv.metrics.dir"] = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", clusterCfg["entity_id"], "metrics") + addedCSVConfig["kafka.metrics.polling.interval.secs"] = "5" + addedCSVConfig["kafka.metrics.reporters"] = "kafka.metrics.KafkaCSVMetricsReporter" addedCSVConfig["kafka.csv.metrics.reporter.enabled"] = "true" if brokerVersion == "0.7": @@ -466,7 +466,7 @@ def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv tcCfg["zookeeper.connect"] = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"] copy_file_with_dict_values(cfgTemplatePathname + "/mirror_consumer.properties", cfgDestPathname + "/" + tcCfg["mirror_consumer_config_filename"], tcCfg, None) - + else: logger.debug("UNHANDLED role " + clusterCfg["role"], extra=d) @@ -495,7 +495,7 @@ def scp_file_to_remote_host(clusterEntityConfigDictList, testcaseEnv): def start_zookeepers(systemTestEnv, testcaseEnv): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - zkEntityIdList = system_test_utils.get_data_from_list_of_dicts( + zkEntityIdList = system_test_utils.get_data_from_list_of_dicts( clusterEntityConfigDictList, "role", "zookeeper", "entity_id") for zkEntityId in zkEntityIdList: @@ -534,7 +534,7 @@ def start_zookeepers(systemTestEnv, testcaseEnv): def start_brokers(systemTestEnv, testcaseEnv): clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( clusterEntityConfigDictList, "role", "broker", "entity_id") for brokerEntityId in brokerEntityIdList: @@ -558,7 +558,7 @@ def start_mirror_makers(systemTestEnv, testcaseEnv, onlyThisEntityId=None): start_entity_in_background(systemTestEnv, testcaseEnv, onlyThisEntityId) else: clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( clusterEntityConfigDictList, "role", "mirror_maker", "entity_id") for brokerEntityId in brokerEntityIdList: @@ -571,17 +571,17 @@ def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv, leaderAttributesDic # keep track of broker related data in this dict such as broker id, # entity id and timestamp and return it to the caller function - shutdownBrokerDict = {} + shutdownBrokerDict = {} clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList - brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( clusterEntityConfigDictList, "role", "broker", "entity_id") for brokerEntityId in brokerEntityIdList: - hostname = system_test_utils.get_data_by_lookup_keyval( + hostname = system_test_utils.get_data_by_lookup_keyval( clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname") - logFile = system_test_utils.get_data_by_lookup_keyval( + logFile = system_test_utils.get_data_by_lookup_keyval( testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename") logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") @@ -629,7 +629,7 @@ def get_leader_elected_log_line(systemTestEnv, testcaseEnv, leaderAttributesDict # keep track of leader related data in this dict such as broker id, # entity id and timestamp and return it to the caller function - leaderDict = {} + leaderDict = {} clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \ @@ -754,7 +754,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): "--whitelist=\".*\" >> ", logPathName + "/" + logFile + " & echo pid:$! > ", logPathName + "/entity_" + entityId + "_pid'"] - else: + else: cmdList = ["ssh " + hostname, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, @@ -815,7 +815,7 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId): cmdList = ["ssh " + hostname, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, - kafkaHome + "/bin/kafka-run-class.sh kafka.consumer.ConsoleConsumer", + kafkaHome + "/bin/kafka-run-class.sh kafka.tools.ConsoleConsumer", "--zookeeper " + zkConnectStr, "--topic " + topic, "--consumer-timeout-ms " + timeoutMs, @@ -861,9 +861,9 @@ def start_console_consumer(systemTestEnv, testcaseEnv): for consumerConfig in consumerConfigList: host = consumerConfig["hostname"] entityId = consumerConfig["entity_id"] - jmxPort = consumerConfig["jmx_port"] + jmxPort = consumerConfig["jmx_port"] role = consumerConfig["role"] - clusterName = consumerConfig["cluster_name"] + clusterName = consumerConfig["cluster_name"] kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "java_home") jmxPort = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "jmx_port") @@ -928,7 +928,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv): cmdList = ["ssh " + host, "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, - kafkaRunClassBin + " kafka.consumer.ConsoleConsumer", + kafkaRunClassBin + " kafka.tools.ConsoleConsumer", "--zookeeper " + zkConnectStr, "--topic " + topic, "--consumer-timeout-ms " + timeoutMs, @@ -974,8 +974,8 @@ def start_producer_performance(systemTestEnv, testcaseEnv, kafka07Client): for producerConfig in producerConfigList: host = producerConfig["hostname"] entityId = producerConfig["entity_id"] - jmxPort = producerConfig["jmx_port"] - role = producerConfig["role"] + jmxPort = producerConfig["jmx_port"] + role = producerConfig["role"] thread.start_new_thread(start_producer_in_thread, (testcaseEnv, entityConfigList, producerConfig, kafka07Client)) logger.debug("calling testcaseEnv.lock.acquire()", extra=d) @@ -1017,7 +1017,7 @@ def generate_topics_string(topicPrefix, numOfTopics): def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client): host = producerConfig["hostname"] entityId = producerConfig["entity_id"] - jmxPort = producerConfig["jmx_port"] + jmxPort = producerConfig["jmx_port"] role = producerConfig["role"] clusterName = producerConfig["cluster_name"] kafkaHome = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "kafka_home") @@ -1109,7 +1109,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, - kafkaRunClassBin + " kafka.perf.ProducerPerformance", + kafkaRunClassBin + " kafka.tools.ProducerPerformance", "--broker-list " + brokerListStr, "--initial-message-id " + str(initMsgId), "--messages " + noMsgPerBatch, @@ -1145,7 +1145,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk "'JAVA_HOME=" + javaHome, "JMX_PORT=" + jmxPort, "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/test-log4j.properties" % kafkaHome, - kafkaRunClassBin + " kafka.perf.ProducerPerformance", + kafkaRunClassBin + " kafka.tools.ProducerPerformance", "--brokerinfo " + brokerInfoStr, "--initial-message-id " + str(initMsgId), "--messages " + noMsgPerBatch, @@ -1255,7 +1255,7 @@ def create_topic_for_producer_performance(systemTestEnv, testcaseEnv): testcaseBaseDir = replace_kafka_home(testcaseBaseDir, kafkaHome) for topic in topicsList: - logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) + logger.info("creating topic: [" + topic + "] at: [" + zkConnectStr + "]", extra=d) cmdList = ["ssh " + zkHost, "'JAVA_HOME=" + javaHome, createTopicBin, @@ -1264,7 +1264,7 @@ def create_topic_for_producer_performance(systemTestEnv, testcaseEnv): " --replication-factor " + testcaseEnv.testcaseArgumentsDict["replica_factor"], " --partitions " + testcaseEnv.testcaseArgumentsDict["num_partition"] + " >> ", testcaseBaseDir + "/logs/create_source_cluster_topic.log'"] - + cmdStr = " ".join(cmdList) logger.debug("executing command: [" + cmdStr + "]", extra=d) subproc = system_test_utils.sys_call_return_subproc(cmdStr) @@ -1556,7 +1556,7 @@ def ps_grep_terminate_running_entity(systemTestEnv): cmdStr = " ".join(cmdList) logger.debug("executing command [" + cmdStr + "]", extra=d) - system_test_utils.sys_call(cmdStr) + system_test_utils.sys_call(cmdStr) def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict, leaderAttributesDict): leaderEntityId = None @@ -1613,7 +1613,7 @@ def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict, leaderAttribu if shutdownTimestamp > 0: leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownTimestamp) logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d) - + return leaderReElectionLatency @@ -1649,7 +1649,7 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv): break logger.debug("calling testcaseEnv.lock.release()", extra=d) testcaseEnv.lock.release() - + testcaseEnv.producerHostParentPidDict.clear() for hostname, consumerPPid in testcaseEnv.consumerHostParentPidDict.items(): @@ -1684,8 +1684,8 @@ def start_migration_tool(systemTestEnv, testcaseEnv, onlyThisEntityId=None): if onlyThisEntityId is None or entityId == onlyThisEntityId: host = migrationToolConfig["hostname"] - jmxPort = migrationToolConfig["jmx_port"] - role = migrationToolConfig["role"] + jmxPort = migrationToolConfig["jmx_port"] + role = migrationToolConfig["role"] kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "java_home") jmxPort = system_test_utils.get_data_by_lookup_keyval(clusterConfigList, "entity_id", entityId, "jmx_port") @@ -1751,7 +1751,7 @@ def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv): producerEntityId = prodPerfCfg["entity_id"] topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic") - consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( + consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( clusterEntityConfigDictList, "role", "console_consumer", "entity_id") matchingConsumerEntityId = None @@ -1765,7 +1765,7 @@ def validate_07_08_migrated_data_matched(systemTestEnv, testcaseEnv): if matchingConsumerEntityId is None: break - msgChecksumMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( + msgChecksumMissingInConsumerLogPathName = get_testcase_config_log_dir_pathname( testcaseEnv, "console_consumer", matchingConsumerEntityId, "default") \ + "/msg_checksum_missing_in_consumer.log" producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", producerEntityId, "default") @@ -1850,7 +1850,7 @@ def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName # |- 00000000000000000020.log # |- . . . - # loop through all topicPartition directories such as : test_1-0, test_1-1, ... + # loop through all topicPartition directories such as : test_1-0, test_1-1, ... for topicPartition in os.listdir(localLogSegmentPath): # found a topic-partition directory if os.path.isdir(localLogSegmentPath + "/" + topicPartition): @@ -1903,7 +1903,7 @@ def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName # 'test_2-0' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'], # 'test_2-1' : ['d41d8cd98f00b204e9800998ecf8427e','d41d8cd98f00b204e9800998ecf8427e'] # } - + for brokerTopicPartitionKey, md5Checksum in brokerLogCksumDict.items(): tokens = brokerTopicPartitionKey.split(":") brokerKey = tokens[0] @@ -1929,7 +1929,7 @@ def validate_broker_log_segment_checksum(systemTestEnv, testcaseEnv, clusterName logger.debug("merged log segment checksum in " + topicPartition + " matched", extra=d) else: logger.error("unexpected error in " + topicPartition, extra=d) - + if failureCount == 0: validationStatusDict["Validate for merged log segment checksum in cluster [" + clusterName + "]"] = "PASSED" else: @@ -1942,8 +1942,8 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None for consumerConfig in consumerConfigList: host = consumerConfig["hostname"] entityId = consumerConfig["entity_id"] - jmxPort = consumerConfig["jmx_port"] - clusterName = consumerConfig["cluster_name"] + jmxPort = consumerConfig["jmx_port"] + clusterName = consumerConfig["cluster_name"] kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "kafka_home") javaHome = system_test_utils.get_data_by_lookup_keyval(clusterList, "entity_id", entityId, "java_home") kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh" @@ -2007,16 +2007,16 @@ def start_simple_consumer(systemTestEnv, testcaseEnv, minStartingOffsetDict=None "--no-wait-at-logend ", " > " + outputFilePathName, " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"] - + cmdStr = " ".join(cmdList) - + logger.debug("executing command: [" + cmdStr + "]", extra=d) subproc_1 = system_test_utils.sys_call_return_subproc(cmdStr) # dummy for-loop to wait until the process is completed for line in subproc_1.stdout.readlines(): - pass + pass time.sleep(1) - + partitionId += 1 replicaIndex += 1 @@ -2025,7 +2025,7 @@ def get_controller_attributes(systemTestEnv, testcaseEnv): logger.info("Querying Zookeeper for Controller info ...", extra=d) # keep track of controller data in this dict such as broker id & entity id - controllerDict = {} + controllerDict = {} clusterConfigsList = systemTestEnv.clusterEntityConfigDictList tcConfigsList = testcaseEnv.testcaseConfigsList @@ -2080,7 +2080,7 @@ def getMinCommonStartingOffset(systemTestEnv, testcaseEnv, clusterName="source") logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default") localLogSegmentPath = logPathName + "/" + remoteLogSegmentDir - # loop through all topicPartition directories such as : test_1-0, test_1-1, ... + # loop through all topicPartition directories such as : test_1-0, test_1-1, ... for topicPartition in sorted(os.listdir(localLogSegmentPath)): # found a topic-partition directory if os.path.isdir(localLogSegmentPath + "/" + topicPartition): @@ -2119,7 +2119,7 @@ def getMinCommonStartingOffset(systemTestEnv, testcaseEnv, clusterName="source") # u'3:test_2-0': '0', # u'3:test_2-1': '0'} - # loop through brokerLogStartOffsetDict to get the min common starting offset for each topic-partition + # loop through brokerLogStartOffsetDict to get the min common starting offset for each topic-partition for brokerTopicPartition in sorted(brokerLogStartOffsetDict.iterkeys()): topicPartition = brokerTopicPartition.split(':')[1] @@ -2434,7 +2434,7 @@ def get_leader_attributes(systemTestEnv, testcaseEnv): logger.info("Querying Zookeeper for leader info ...", extra=d) # keep track of leader data in this dict such as broker id & entity id - leaderDict = {} + leaderDict = {} clusterConfigsList = systemTestEnv.clusterEntityConfigDictList tcConfigsList = testcaseEnv.testcaseConfigsList @@ -2482,6 +2482,3 @@ def get_leader_attributes(systemTestEnv, testcaseEnv): print leaderDict return leaderDict - - -