diff --git a/bin/kafka-console-consumer.sh b/bin/kafka-console-consumer.sh index e410dde..07c90a9 100755 --- a/bin/kafka-console-consumer.sh +++ b/bin/kafka-console-consumer.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.consumer.ConsoleConsumer $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer $@ diff --git a/bin/kafka-console-producer.sh b/bin/kafka-console-producer.sh index cd8ce62..ccca66d 100755 --- a/bin/kafka-console-producer.sh +++ b/bin/kafka-console-producer.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,4 +17,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.producer.ConsoleProducer $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer $@ diff --git a/bin/kafka-consumer-perf-test.sh b/bin/kafka-consumer-perf-test.sh index 4ed3ed9..ebc513a 100755 --- a/bin/kafka-consumer-perf-test.sh +++ b/bin/kafka-consumer-perf-test.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,4 +17,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.perf.ConsumerPerformance $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance $@ diff --git a/bin/kafka-producer-perf-test.sh b/bin/kafka-producer-perf-test.sh index b4efc29..84ac949 100755 --- a/bin/kafka-producer-perf-test.sh +++ b/bin/kafka-producer-perf-test.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,4 +17,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.perf.ProducerPerformance $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ProducerPerformance $@ diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index d2fc8c0..5d5021d 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -41,11 +41,6 @@ do CLASSPATH=$CLASSPATH:$file done -for file in $base_dir/perf/build/libs//kafka-perf_${SCALA_VERSION}*.jar; -do - CLASSPATH=$CLASSPATH:$file -done - for file in $base_dir/examples/build/libs//kafka-examples*.jar; do CLASSPATH=$CLASSPATH:$file @@ -155,6 +150,3 @@ if [ "x$DAEMON_MODE" = "xtrue" ]; then else exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" fi - - - diff --git a/bin/kafka-simple-consumer-perf-test.sh b/bin/kafka-simple-consumer-perf-test.sh index 2d3e3d3..b1a5cfc 100755 --- a/bin/kafka-simple-consumer-perf-test.sh +++ b/bin/kafka-simple-consumer-perf-test.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.perf.SimpleConsumerPerformance $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerPerformance $@ diff --git a/bin/windows/kafka-console-consumer.bat b/bin/windows/kafka-console-consumer.bat index 94b20a4..f70f98a 100644 --- a/bin/windows/kafka-console-consumer.bat +++ b/bin/windows/kafka-console-consumer.bat @@ -16,5 +16,5 @@ rem limitations under the License. SetLocal set KAFKA_HEAP_OPTS=-Xmx512M -%~dp0kafka-run-class.bat kafka.consumer.ConsoleConsumer %* +%~dp0kafka-run-class.bat kafka.tools.ConsoleConsumer %* EndLocal diff --git a/bin/windows/kafka-console-producer.bat b/bin/windows/kafka-console-producer.bat index b116e64..a5b57de 100644 --- a/bin/windows/kafka-console-producer.bat +++ b/bin/windows/kafka-console-producer.bat @@ -16,5 +16,5 @@ rem limitations under the License. SetLocal set KAFKA_HEAP_OPTS=-Xmx512M -%~dp0kafka-run-class.bat kafka.producer.ConsoleProducer %* +%~dp0kafka-run-class.bat kafka.tools.ConsoleProducer %* EndLocal diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala deleted file mode 100644 index 1a16c69..0000000 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ /dev/null @@ -1,284 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.consumer - -import scala.collection.JavaConversions._ -import org.I0Itec.zkclient._ -import joptsimple._ -import java.util.Properties -import java.util.Random -import java.io.PrintStream -import kafka.message._ -import kafka.serializer._ -import kafka.utils._ -import kafka.metrics.KafkaMetricsReporter - - -/** - * Consumer that dumps messages out to standard out. - * - */ -object ConsoleConsumer extends Logging { - - def main(args: Array[String]) { - val parser = new OptionParser - val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") - .withRequiredArg - .describedAs("whitelist") - .ofType(classOf[String]) - val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.") - .withRequiredArg - .describedAs("blacklist") - .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - - val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") - .withRequiredArg - .describedAs("config file") - .ofType(classOf[String]) - val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") - .withRequiredArg - .describedAs("class") - .ofType(classOf[String]) - .defaultsTo(classOf[DefaultMessageFormatter].getName) - val messageFormatterArgOpt = parser.accepts("property") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[String]) - val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); - val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + - "start with the earliest message present in the log rather than the latest message.") - val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") - .withRequiredArg - .describedAs("num_messages") - .ofType(classOf[java.lang.Integer]) - val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + - "skip it instead of halt.") - val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") - val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") - .withRequiredArg - .describedAs("metrics dictory") - .ofType(classOf[java.lang.String]) - - var groupIdPassed = true - val options: OptionSet = tryParse(parser, args) - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) - val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) - if (topicOrFilterOpt.size != 1) { - error("Exactly one of whitelist/blacklist/topic is required.") - parser.printHelpOn(System.err) - System.exit(1) - } - val topicArg = options.valueOf(topicOrFilterOpt.head) - val filterSpec = if (options.has(blacklistOpt)) - new Blacklist(topicArg) - else - new Whitelist(topicArg) - - val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) - if (csvMetricsReporterEnabled) { - val csvReporterProps = new Properties() - csvReporterProps.put("kafka.metrics.polling.interval.secs", "5") - csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") - if (options.has(metricsDirectoryOpt)) - csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) - else - csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics") - csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true") - val verifiableProps = new VerifiableProperties(csvReporterProps) - KafkaMetricsReporter.startReporters(verifiableProps) - } - - - - val consumerProps = if (options.has(consumerConfigOpt)) - Utils.loadProps(options.valueOf(consumerConfigOpt)) - else - new Properties() - - if(!consumerProps.containsKey("group.id")) { - consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) - groupIdPassed=false - } - consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") - consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - if(!consumerProps.containsKey("dual.commit.enabled")) - consumerProps.put("dual.commit.enabled","false") - if(!consumerProps.containsKey("offsets.storage")) - consumerProps.put("offsets.storage","zookeeper") - - if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) && - checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) { - System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id") - +". Please use --delete-consumer-offsets to delete previous offsets metadata") - System.exit(1) - } - - if(options.has(deleteConsumerOffsetsOpt)) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id")) - - val config = new ConsumerConfig(consumerProps) - val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false - val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) - val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) - val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 - val connector = Consumer.create(config) - - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - connector.shutdown() - // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if(!groupIdPassed) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id")) - } - }) - - var numMessages = 0L - val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] - formatter.init(formatterArgs) - try { - val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) - val iter = if(maxMessages >= 0) - stream.slice(0, maxMessages) - else - stream - - for(messageAndTopic <- iter) { - try { - formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) - numMessages += 1 - } catch { - case e: Throwable => - if (skipMessageOnError) - error("Error processing message, skipping this message: ", e) - else - throw e - } - if(System.out.checkError()) { - // This means no one is listening to our output stream any more, time to shutdown - System.err.println("Unable to write to standard out, closing consumer.") - System.err.println("Consumed %d messages".format(numMessages)) - formatter.close() - connector.shutdown() - System.exit(1) - } - } - } catch { - case e: Throwable => error("Error processing message, stopping consumer: ", e) - } - System.err.println("Consumed %d messages".format(numMessages)) - System.out.flush() - formatter.close() - connector.shutdown() - } - - def tryParse(parser: OptionParser, args: Array[String]) = { - try { - parser.parse(args : _*) - } catch { - case e: OptionException => { - Utils.croak(e.getMessage) - null - } - } - } - - def checkZkPathExists(zkUrl: String, path: String): Boolean = { - try { - val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); - zk.exists(path) - } catch { - case _: Throwable => false - } - } -} - -object MessageFormatter { - def tryParseFormatterArgs(args: Iterable[String]): Properties = { - val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0) - if(!splits.forall(_.length == 2)) { - System.err.println("Invalid parser arguments: " + args.mkString(" ")) - System.exit(1) - } - val props = new Properties - for(a <- splits) - props.put(a(0), a(1)) - props - } -} - -trait MessageFormatter { - def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) - def init(props: Properties) {} - def close() {} -} - -class DefaultMessageFormatter extends MessageFormatter { - var printKey = false - var keySeparator = "\t".getBytes - var lineSeparator = "\n".getBytes - - override def init(props: Properties) { - if(props.containsKey("print.key")) - printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") - if(props.containsKey("key.separator")) - keySeparator = props.getProperty("key.separator").getBytes - if(props.containsKey("line.separator")) - lineSeparator = props.getProperty("line.separator").getBytes - } - - def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { - if(printKey) { - output.write(if (key == null) "null".getBytes() else key) - output.write(keySeparator) - } - output.write(if (value == null) "null".getBytes() else value) - output.write(lineSeparator) - } -} - -class NoOpMessageFormatter extends MessageFormatter { - override def init(props: Properties) {} - def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {} -} - -class ChecksumMessageFormatter extends MessageFormatter { - private var topicStr: String = _ - - override def init(props: Properties) { - topicStr = props.getProperty("topic") - if (topicStr != null) - topicStr = topicStr + ":" - else - topicStr = "" - } - - def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { - val chksum = new Message(value, key).checksum - output.println(topicStr + "checksum:" + chksum) - } -} diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala deleted file mode 100644 index a2af988..0000000 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ /dev/null @@ -1,299 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.producer - -import kafka.common._ -import kafka.message._ -import kafka.serializer._ -import kafka.utils.CommandLineUtils - -import java.util.Properties -import java.io._ - -import joptsimple._ - -object ConsoleProducer { - - def main(args: Array[String]) { - - val config = new ProducerConfig(args) - val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader] - val props = new Properties - props.put("topic", config.topic) - props.putAll(config.cmdLineProps) - reader.init(System.in, props) - - try { - val producer = - if(config.useNewProducer) { - import org.apache.kafka.clients.producer.ProducerConfig - - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) - props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) - props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) - props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) - props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) - if(config.queueEnqueueTimeoutMs != -1) - props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) - props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") - - new NewShinyProducer(props) - } else { - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec) - props.put("producer.type", if(config.sync) "sync" else "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("message.send.max.retries", config.messageSendMaxRetries.toString) - props.put("retry.backoff.ms", config.retryBackoffMs.toString) - props.put("queue.buffering.max.ms", config.sendTimeout.toString) - props.put("queue.buffering.max.messages", config.queueSize.toString) - props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) - props.put("request.required.acks", config.requestRequiredAcks.toString) - props.put("request.timeout.ms", config.requestTimeoutMs.toString) - props.put("key.serializer.class", config.keyEncoderClass) - props.put("serializer.class", config.valueEncoderClass) - props.put("send.buffer.bytes", config.socketBuffer.toString) - props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) - props.put("client.id", "console-producer") - - new OldProducer(props) - } - - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - producer.close() - } - }) - - var message: KeyedMessage[Array[Byte], Array[Byte]] = null - do { - message = reader.readMessage() - if(message != null) - producer.send(message.topic, message.key, message.message) - } while(message != null) - } catch { - case e: Exception => - e.printStackTrace - System.exit(1) - } - System.exit(0) - } - - class ProducerConfig(args: Array[String]) { - val parser = new OptionParser - val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.") - .withRequiredArg - .describedAs("broker-list") - .ofType(classOf[String]) - val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") - val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." + - "If specified without value, than it defaults to 'gzip'") - .withOptionalArg() - .describedAs("compression-codec") - .ofType(classOf[String]) - val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(200) - val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.") - .withRequiredArg - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3) - val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.") - .withRequiredArg - .ofType(classOf[java.lang.Long]) - .defaultsTo(100) - val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + - " a message will queue awaiting suffient batch size. The value is given in ms.") - .withRequiredArg - .describedAs("timeout_ms") - .ofType(classOf[java.lang.Long]) - .defaultsTo(1000) - val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " + - " messages will queue awaiting suffient batch size.") - .withRequiredArg - .describedAs("queue_size") - .ofType(classOf[java.lang.Long]) - .defaultsTo(10000) - val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue") - .withRequiredArg - .describedAs("queue enqueuetimeout ms") - .ofType(classOf[java.lang.Long]) - .defaultsTo(Int.MaxValue) - val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests") - .withRequiredArg - .describedAs("request required acks") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero") - .withRequiredArg - .describedAs("request timeout ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1500) - val metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms", - "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes.") - .withRequiredArg - .describedAs("metadata expiration interval") - .ofType(classOf[java.lang.Long]) - .defaultsTo(5*60*1000L) - val metadataFetchTimeoutMsOpt = parser.accepts("metadata-fetch-timeout-ms", - "The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that topic.") - .withRequiredArg - .describedAs("metadata fetch timeout") - .ofType(classOf[java.lang.Long]) - .defaultsTo(60*1000L) - val maxMemoryBytesOpt = parser.accepts("max-memory-bytes", - "The total memory used by the producer to buffer records waiting to be sent to the server.") - .withRequiredArg - .describedAs("total memory in bytes") - .ofType(classOf[java.lang.Long]) - .defaultsTo(32 * 1024 * 1024L) - val maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes", - "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " + - "will attempt to optimistically group them together until this size is reached.") - .withRequiredArg - .describedAs("memory in bytes per partition") - .ofType(classOf[java.lang.Long]) - .defaultsTo(16 * 1024L) - val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.") - .withRequiredArg - .describedAs("encoder_class") - .ofType(classOf[java.lang.String]) - .defaultsTo(classOf[DefaultEncoder].getName) - val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.") - .withRequiredArg - .describedAs("encoder_class") - .ofType(classOf[java.lang.String]) - .defaultsTo(classOf[DefaultEncoder].getName) - val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " + - "By default each line is read as a separate message.") - .withRequiredArg - .describedAs("reader_class") - .ofType(classOf[java.lang.String]) - .defaultsTo(classOf[LineMessageReader].getName) - val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024*100) - val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + - "This allows custom configuration for a user-defined message reader.") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[String]) - val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") - - val options = parser.parse(args : _*) - for(arg <- List(topicOpt, brokerListOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - import scala.collection.JavaConversions._ - val useNewProducer = options.has(useNewProducerOpt) - val topic = options.valueOf(topicOpt) - val brokerList = options.valueOf(brokerListOpt) - val sync = options.has(syncOpt) - val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) - val compressionCodec = if (options.has(compressionCodecOpt)) - if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty) - DefaultCompressionCodec.name - else compressionCodecOptionValue - else NoCompressionCodec.name - val batchSize = options.valueOf(batchSizeOpt) - val sendTimeout = options.valueOf(sendTimeoutOpt) - val queueSize = options.valueOf(queueSizeOpt) - val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt) - val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt) - val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt) - val messageSendMaxRetries = options.valueOf(messageSendMaxRetriesOpt) - val retryBackoffMs = options.valueOf(retryBackoffMsOpt) - val keyEncoderClass = options.valueOf(keyEncoderOpt) - val valueEncoderClass = options.valueOf(valueEncoderOpt) - val readerClass = options.valueOf(messageReaderOpt) - val socketBuffer = options.valueOf(socketBufferSizeOpt) - val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt)) - /* new producer related configs */ - val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt) - val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt) - val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt) - val metadataFetchTimeoutMs = options.valueOf(metadataFetchTimeoutMsOpt) - } - - trait MessageReader { - def init(inputStream: InputStream, props: Properties) {} - def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] - def close() {} - } - - class LineMessageReader extends MessageReader { - var topic: String = null - var reader: BufferedReader = null - var parseKey = false - var keySeparator = "\t" - var ignoreError = false - var lineNumber = 0 - - override def init(inputStream: InputStream, props: Properties) { - topic = props.getProperty("topic") - if(props.containsKey("parse.key")) - parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true") - if(props.containsKey("key.separator")) - keySeparator = props.getProperty("key.separator") - if(props.containsKey("ignore.error")) - ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true") - reader = new BufferedReader(new InputStreamReader(inputStream)) - } - - override def readMessage() = { - lineNumber += 1 - (reader.readLine(), parseKey) match { - case (null, _) => null - case (line, true) => - line.indexOf(keySeparator) match { - case -1 => - if(ignoreError) - new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) - else - throw new KafkaException("No key found on line " + lineNumber + ": " + line) - case n => - new KeyedMessage[Array[Byte], Array[Byte]](topic, - line.substring(0, n).getBytes, - (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes()) - } - case (line, false) => - new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) - } - } - } -} diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 5417628..0e22897 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -30,7 +30,7 @@ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import scala.Some import kafka.common.TopicAndPartition -import kafka.consumer.MessageFormatter +import kafka.tools.MessageFormatter import java.io.PrintStream import org.apache.kafka.common.protocol.types.{Struct, Schema, Field} import org.apache.kafka.common.protocol.types.Type.STRING @@ -247,7 +247,7 @@ class OffsetManager(val config: OffsetManagerConfig, * Asynchronously read the partition from the offsets topic and populate the cache */ def loadOffsetsFromLog(offsetsPartition: Int) { - + val topicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition) loadingPartitions synchronized { @@ -477,4 +477,3 @@ case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition) } - diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala new file mode 100644 index 0000000..f6bc2f1 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import scala.collection.JavaConversions._ +import org.I0Itec.zkclient._ +import joptsimple._ +import java.util.Properties +import java.util.Random +import java.io.PrintStream +import kafka.message._ +import kafka.serializer._ +import kafka.utils._ +import kafka.metrics.KafkaMetricsReporter +import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer} + +/** + * Consumer that dumps messages out to standard out. + * + */ +object ConsoleConsumer extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") + .withRequiredArg + .describedAs("whitelist") + .ofType(classOf[String]) + val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.") + .withRequiredArg + .describedAs("blacklist") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + + val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") + .withRequiredArg + .describedAs("config file") + .ofType(classOf[String]) + val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") + .withRequiredArg + .describedAs("class") + .ofType(classOf[String]) + .defaultsTo(classOf[DefaultMessageFormatter].getName) + val messageFormatterArgOpt = parser.accepts("property") + .withRequiredArg + .describedAs("prop") + .ofType(classOf[String]) + val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); + val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + + "start with the earliest message present in the log rather than the latest message.") + val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) + val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + + "skip it instead of halt.") + val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") + val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + + "set, the csv metrics will be outputed here") + .withRequiredArg + .describedAs("metrics dictory") + .ofType(classOf[java.lang.String]) + + var groupIdPassed = true + val options: OptionSet = tryParse(parser, args) + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) + if (topicOrFilterOpt.size != 1) { + error("Exactly one of whitelist/blacklist/topic is required.") + parser.printHelpOn(System.err) + System.exit(1) + } + val topicArg = options.valueOf(topicOrFilterOpt.head) + val filterSpec = if (options.has(blacklistOpt)) + new Blacklist(topicArg) + else + new Whitelist(topicArg) + + val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) + if (csvMetricsReporterEnabled) { + val csvReporterProps = new Properties() + csvReporterProps.put("kafka.metrics.polling.interval.secs", "5") + csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") + if (options.has(metricsDirectoryOpt)) + csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) + else + csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics") + csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true") + val verifiableProps = new VerifiableProperties(csvReporterProps) + KafkaMetricsReporter.startReporters(verifiableProps) + } + + + + val consumerProps = if (options.has(consumerConfigOpt)) + Utils.loadProps(options.valueOf(consumerConfigOpt)) + else + new Properties() + + if(!consumerProps.containsKey("group.id")) { + consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) + groupIdPassed=false + } + consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") + consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt)) + if(!consumerProps.containsKey("dual.commit.enabled")) + consumerProps.put("dual.commit.enabled","false") + if(!consumerProps.containsKey("offsets.storage")) + consumerProps.put("offsets.storage","zookeeper") + + if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) && + checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) { + System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id") + +". Please use --delete-consumer-offsets to delete previous offsets metadata") + System.exit(1) + } + + if(options.has(deleteConsumerOffsetsOpt)) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id")) + + val config = new ConsumerConfig(consumerProps) + val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false + val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) + val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) + val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 + val connector = Consumer.create(config) + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + connector.shutdown() + // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack + if(!groupIdPassed) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id")) + } + }) + + var numMessages = 0L + val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] + formatter.init(formatterArgs) + try { + val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) + val iter = if(maxMessages >= 0) + stream.slice(0, maxMessages) + else + stream + + for(messageAndTopic <- iter) { + try { + formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) + numMessages += 1 + } catch { + case e: Throwable => + if (skipMessageOnError) + error("Error processing message, skipping this message: ", e) + else + throw e + } + if(System.out.checkError()) { + // This means no one is listening to our output stream any more, time to shutdown + System.err.println("Unable to write to standard out, closing consumer.") + System.err.println("Consumed %d messages".format(numMessages)) + formatter.close() + connector.shutdown() + System.exit(1) + } + } + } catch { + case e: Throwable => error("Error processing message, stopping consumer: ", e) + } + System.err.println("Consumed %d messages".format(numMessages)) + System.out.flush() + formatter.close() + connector.shutdown() + } + + def tryParse(parser: OptionParser, args: Array[String]) = { + try { + parser.parse(args : _*) + } catch { + case e: OptionException => { + Utils.croak(e.getMessage) + null + } + } + } + + def checkZkPathExists(zkUrl: String, path: String): Boolean = { + try { + val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); + zk.exists(path) + } catch { + case _: Throwable => false + } + } +} + +object MessageFormatter { + def tryParseFormatterArgs(args: Iterable[String]): Properties = { + val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0) + if(!splits.forall(_.length == 2)) { + System.err.println("Invalid parser arguments: " + args.mkString(" ")) + System.exit(1) + } + val props = new Properties + for(a <- splits) + props.put(a(0), a(1)) + props + } +} + +trait MessageFormatter { + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) + def init(props: Properties) {} + def close() {} +} + +class DefaultMessageFormatter extends MessageFormatter { + var printKey = false + var keySeparator = "\t".getBytes + var lineSeparator = "\n".getBytes + + override def init(props: Properties) { + if(props.containsKey("print.key")) + printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") + if(props.containsKey("key.separator")) + keySeparator = props.getProperty("key.separator").getBytes + if(props.containsKey("line.separator")) + lineSeparator = props.getProperty("line.separator").getBytes + } + + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { + if(printKey) { + output.write(if (key == null) "null".getBytes() else key) + output.write(keySeparator) + } + output.write(if (value == null) "null".getBytes() else value) + output.write(lineSeparator) + } +} + +class NoOpMessageFormatter extends MessageFormatter { + override def init(props: Properties) {} + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {} +} + +class ChecksumMessageFormatter extends MessageFormatter { + private var topicStr: String = _ + + override def init(props: Properties) { + topicStr = props.getProperty("topic") + if (topicStr != null) + topicStr = topicStr + ":" + else + topicStr = "" + } + + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { + val chksum = new Message(value, key).checksum + output.println(topicStr + "checksum:" + chksum) + } +} diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala new file mode 100644 index 0000000..c250d35 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import kafka.common._ +import kafka.message._ +import kafka.serializer._ +import kafka.producer.{NewShinyProducer,OldProducer,KeyedMessage} +import kafka.utils.CommandLineUtils + +import java.util.Properties +import java.io._ + +import joptsimple._ + +object ConsoleProducer { + + def main(args: Array[String]) { + + val config = new ProducerConfig(args) + val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader] + val props = new Properties + props.put("topic", config.topic) + props.putAll(config.cmdLineProps) + reader.init(System.in, props) + + try { + val producer = + if(config.useNewProducer) { + import org.apache.kafka.clients.producer.ProducerConfig + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, config.socketBuffer.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.retryBackoffMs.toString) + props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.metadataExpiryMs.toString) + props.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, config.metadataFetchTimeoutMs.toString) + props.put(ProducerConfig.ACKS_CONFIG, config.requestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.requestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.messageSendMaxRetries.toString) + props.put(ProducerConfig.LINGER_MS_CONFIG, config.sendTimeout.toString) + if(config.queueEnqueueTimeoutMs != -1) + props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) + props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") + + new NewShinyProducer(props) + } else { + props.put("metadata.broker.list", config.brokerList) + props.put("compression.codec", config.compressionCodec) + props.put("producer.type", if(config.sync) "sync" else "async") + props.put("batch.num.messages", config.batchSize.toString) + props.put("message.send.max.retries", config.messageSendMaxRetries.toString) + props.put("retry.backoff.ms", config.retryBackoffMs.toString) + props.put("queue.buffering.max.ms", config.sendTimeout.toString) + props.put("queue.buffering.max.messages", config.queueSize.toString) + props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) + props.put("request.required.acks", config.requestRequiredAcks.toString) + props.put("request.timeout.ms", config.requestTimeoutMs.toString) + props.put("key.serializer.class", config.keyEncoderClass) + props.put("serializer.class", config.valueEncoderClass) + props.put("send.buffer.bytes", config.socketBuffer.toString) + props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) + props.put("client.id", "console-producer") + + new OldProducer(props) + } + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + producer.close() + } + }) + + var message: KeyedMessage[Array[Byte], Array[Byte]] = null + do { + message = reader.readMessage() + if(message != null) + producer.send(message.topic, message.key, message.message) + } while(message != null) + } catch { + case e: Exception => + e.printStackTrace + System.exit(1) + } + System.exit(0) + } + + class ProducerConfig(args: Array[String]) { + val parser = new OptionParser + val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.") + .withRequiredArg + .describedAs("broker-list") + .ofType(classOf[String]) + val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") + val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." + + "If specified without value, than it defaults to 'gzip'") + .withOptionalArg() + .describedAs("compression-codec") + .ofType(classOf[String]) + val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.") + .withRequiredArg + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3) + val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.") + .withRequiredArg + .ofType(classOf[java.lang.Long]) + .defaultsTo(100) + val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + + " a message will queue awaiting suffient batch size. The value is given in ms.") + .withRequiredArg + .describedAs("timeout_ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(1000) + val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " + + " messages will queue awaiting suffient batch size.") + .withRequiredArg + .describedAs("queue_size") + .ofType(classOf[java.lang.Long]) + .defaultsTo(10000) + val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue") + .withRequiredArg + .describedAs("queue enqueuetimeout ms") + .ofType(classOf[java.lang.Long]) + .defaultsTo(Int.MaxValue) + val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests") + .withRequiredArg + .describedAs("request required acks") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero") + .withRequiredArg + .describedAs("request timeout ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1500) + val metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms", + "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes.") + .withRequiredArg + .describedAs("metadata expiration interval") + .ofType(classOf[java.lang.Long]) + .defaultsTo(5*60*1000L) + val metadataFetchTimeoutMsOpt = parser.accepts("metadata-fetch-timeout-ms", + "The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that topic.") + .withRequiredArg + .describedAs("metadata fetch timeout") + .ofType(classOf[java.lang.Long]) + .defaultsTo(60*1000L) + val maxMemoryBytesOpt = parser.accepts("max-memory-bytes", + "The total memory used by the producer to buffer records waiting to be sent to the server.") + .withRequiredArg + .describedAs("total memory in bytes") + .ofType(classOf[java.lang.Long]) + .defaultsTo(32 * 1024 * 1024L) + val maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes", + "The buffer size allocated for a partition. When records are received which are smaller than this size the producer " + + "will attempt to optimistically group them together until this size is reached.") + .withRequiredArg + .describedAs("memory in bytes per partition") + .ofType(classOf[java.lang.Long]) + .defaultsTo(16 * 1024L) + val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.") + .withRequiredArg + .describedAs("encoder_class") + .ofType(classOf[java.lang.String]) + .defaultsTo(classOf[DefaultEncoder].getName) + val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.") + .withRequiredArg + .describedAs("encoder_class") + .ofType(classOf[java.lang.String]) + .defaultsTo(classOf[DefaultEncoder].getName) + val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " + + "By default each line is read as a separate message.") + .withRequiredArg + .describedAs("reader_class") + .ofType(classOf[java.lang.String]) + .defaultsTo(classOf[LineMessageReader].getName) + val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024*100) + val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + + "This allows custom configuration for a user-defined message reader.") + .withRequiredArg + .describedAs("prop") + .ofType(classOf[String]) + val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") + + val options = parser.parse(args : _*) + for(arg <- List(topicOpt, brokerListOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + import scala.collection.JavaConversions._ + val useNewProducer = options.has(useNewProducerOpt) + val topic = options.valueOf(topicOpt) + val brokerList = options.valueOf(brokerListOpt) + val sync = options.has(syncOpt) + val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) + val compressionCodec = if (options.has(compressionCodecOpt)) + if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty) + DefaultCompressionCodec.name + else compressionCodecOptionValue + else NoCompressionCodec.name + val batchSize = options.valueOf(batchSizeOpt) + val sendTimeout = options.valueOf(sendTimeoutOpt) + val queueSize = options.valueOf(queueSizeOpt) + val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt) + val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt) + val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt) + val messageSendMaxRetries = options.valueOf(messageSendMaxRetriesOpt) + val retryBackoffMs = options.valueOf(retryBackoffMsOpt) + val keyEncoderClass = options.valueOf(keyEncoderOpt) + val valueEncoderClass = options.valueOf(valueEncoderOpt) + val readerClass = options.valueOf(messageReaderOpt) + val socketBuffer = options.valueOf(socketBufferSizeOpt) + val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt)) + /* new producer related configs */ + val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt) + val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt) + val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt) + val metadataFetchTimeoutMs = options.valueOf(metadataFetchTimeoutMsOpt) + } + + trait MessageReader { + def init(inputStream: InputStream, props: Properties) {} + def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] + def close() {} + } + + class LineMessageReader extends MessageReader { + var topic: String = null + var reader: BufferedReader = null + var parseKey = false + var keySeparator = "\t" + var ignoreError = false + var lineNumber = 0 + + override def init(inputStream: InputStream, props: Properties) { + topic = props.getProperty("topic") + if(props.containsKey("parse.key")) + parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true") + if(props.containsKey("key.separator")) + keySeparator = props.getProperty("key.separator") + if(props.containsKey("ignore.error")) + ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true") + reader = new BufferedReader(new InputStreamReader(inputStream)) + } + + override def readMessage() = { + lineNumber += 1 + (reader.readLine(), parseKey) match { + case (null, _) => null + case (line, true) => + line.indexOf(keySeparator) match { + case -1 => + if(ignoreError) + new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) + else + throw new KafkaException("No key found on line " + lineNumber + ": " + line) + case n => + new KeyedMessage[Array[Byte], Array[Byte]](topic, + line.substring(0, n).getBytes, + (if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes()) + } + case (line, false) => + new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes()) + } + } + } +} diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala new file mode 100644 index 0000000..4688349 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicLong +import java.nio.channels.ClosedByInterruptException +import org.apache.log4j.Logger +import kafka.message.Message +import kafka.utils.ZkUtils +import java.util.{ Random, Properties } +import kafka.consumer._ +import java.text.SimpleDateFormat + +/** + * Performance test for the full zookeeper consumer + */ +object ConsumerPerformance { + private val logger = Logger.getLogger(getClass()) + + def main(args: Array[String]): Unit = { + + val config = new ConsumerPerfConfig(args) + logger.info("Starting consumer...") + var totalMessagesRead = new AtomicLong(0) + var totalBytesRead = new AtomicLong(0) + + if (!config.hideHeader) { + if (!config.showDetailedStats) + println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + else + println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + } + + // clean up zookeeper state for this group id for every perf run + ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId) + + val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig) + + val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads)) + var threadList = List[ConsumerPerfThread]() + for ((topic, streamList) <- topicMessageStreams) + for (i <- 0 until streamList.length) + threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, + totalMessagesRead, totalBytesRead) + + logger.info("Sleeping for 1 second.") + Thread.sleep(1000) + logger.info("starting threads") + val startMs = System.currentTimeMillis + for (thread <- threadList) + thread.start + + for (thread <- threadList) + thread.join + + val endMs = System.currentTimeMillis + val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0 + if (!config.showDetailedStats) { + val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024) + println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs), + config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, + totalMessagesRead.get / elapsedSecs)) + } + System.exit(0) + } + + class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val groupIdOpt = parser.accepts("group", "The group id to consume on.") + .withRequiredArg + .describedAs("gid") + .defaultsTo("perf-consumer-" + new Random().nextInt(100000)) + .ofType(classOf[String]) + val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024 * 1024) + val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + + "offset to consume from, start with the latest message present in the log rather than the earliest message.") + val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(2 * 1024 * 1024) + val numThreadsOpt = parser.accepts("threads", "Number of processing threads.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(10) + val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + + val options = parser.parse(args: _*) + + for (arg <- List(topicOpt, zkConnectOpt)) { + if (!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val props = new Properties + props.put("group.id", options.valueOf(groupIdOpt)) + props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) + props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) + props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest") + props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) + props.put("consumer.timeout.ms", "5000") + props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString) + val consumerConfig = new ConsumerConfig(props) + val numThreads = options.valueOf(numThreadsOpt).intValue + val topic = options.valueOf(topicOpt) + val numMessages = options.valueOf(numMessagesOpt).longValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val showDetailedStats = options.has(showDetailedStatsOpt) + val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) + val hideHeader = options.has(hideHeaderOpt) + } + + class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]], + config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) + extends Thread(name) { + + override def run() { + var bytesRead = 0L + var messagesRead = 0L + val startMs = System.currentTimeMillis + var lastReportTime: Long = startMs + var lastBytesRead = 0L + var lastMessagesRead = 0L + + try { + val iter = stream.iterator + while (iter.hasNext && messagesRead < config.numMessages) { + val messageAndMetadata = iter.next + messagesRead += 1 + bytesRead += messageAndMetadata.message.length + + if (messagesRead % config.reportingInterval == 0) { + if (config.showDetailedStats) + printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis) + lastReportTime = System.currentTimeMillis + lastMessagesRead = messagesRead + lastBytesRead = bytesRead + } + } + } catch { + case _: InterruptedException => + case _: ClosedByInterruptException => + case _: ConsumerTimeoutException => + case e: Throwable => e.printStackTrace() + } + totalMessagesRead.addAndGet(messagesRead) + totalBytesRead.addAndGet(bytesRead) + if (config.showDetailedStats) + printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis) + } + + private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long, + startMs: Long, endMs: Long) = { + val elapsedMs = endMs - startMs + val totalMBRead = (bytesRead * 1.0) / (1024 * 1024) + val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024) + println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id, + config.consumerConfig.fetchMessageMaxBytes, totalMBRead, + 1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0)) + } + } + +} diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala new file mode 100644 index 0000000..129cc01 --- /dev/null +++ b/core/src/main/scala/kafka/tools/PerfConfig.scala @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.tools + +import joptsimple.OptionParser + + +class PerfConfig(args: Array[String]) { + val parser = new OptionParser + val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Long]) + .defaultsTo(Long.MaxValue) + val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(5000) + val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + + "See java.text.SimpleDateFormat for options.") + .withRequiredArg + .describedAs("date format") + .ofType(classOf[String]) + .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS") + val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + + "interval as configured by reporting-interval") + val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") + val messageSizeOpt = parser.accepts("message-size", "The size of each message.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") + .withRequiredArg + .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val helpOpt = parser.accepts("help", "Print usage.") +} diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala new file mode 100644 index 0000000..95cfbc1 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import kafka.metrics.KafkaMetricsReporter +import kafka.producer.{OldProducer, NewShinyProducer} +import kafka.utils.{VerifiableProperties, Logging} +import kafka.message.CompressionCodec +import kafka.serializer._ + +import java.util.concurrent.{CountDownLatch, Executors} +import java.util.concurrent.atomic.AtomicLong +import java.util._ +import java.text.SimpleDateFormat +import java.math.BigInteger +import scala.collection.immutable.List + +import org.apache.log4j.Logger + +/** + * Load test for the producer + */ +object ProducerPerformance extends Logging { + + def main(args: Array[String]) { + + val logger = Logger.getLogger(getClass) + val config = new ProducerPerfConfig(args) + if (!config.isFixedSize) + logger.info("WARN: Throughput will be slower due to changing message size per request") + + val totalBytesSent = new AtomicLong(0) + val totalMessagesSent = new AtomicLong(0) + val executor = Executors.newFixedThreadPool(config.numThreads) + val allDone = new CountDownLatch(config.numThreads) + val startMs = System.currentTimeMillis + val rand = new java.util.Random + + if (!config.hideHeader) + println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " + + "total.data.sent.in.nMsg, nMsg.sec") + + for (i <- 0 until config.numThreads) { + executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand)) + } + + allDone.await() + val endMs = System.currentTimeMillis + val elapsedSecs = (endMs - startMs) / 1000.0 + val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024) + println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format( + config.dateFormat.format(startMs), config.dateFormat.format(endMs), + config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent, + totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs)) + System.exit(0) + } + + class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) { + val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.") + .withRequiredArg + .describedAs("hostname:port,..,hostname:port") + .ofType(classOf[String]) + val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to") + .withRequiredArg + .describedAs("topic1,topic2..") + .ofType(classOf[String]) + val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3000) + val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3) + val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + + "to complete") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(-1) + val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") + val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.") + val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") + .withRequiredArg + .describedAs("number of threads") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " + + "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + + "in the form of 'Message:000...1:xxx...'") + .withRequiredArg() + .describedAs("initial message id") + .ofType(classOf[java.lang.Integer]) + val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends") + .withRequiredArg() + .describedAs("message send time gap") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") + val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + + "set, the csv metrics will be outputed here") + .withRequiredArg + .describedAs("metrics dictory") + .ofType(classOf[java.lang.String]) + val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") + + val options = parser.parse(args: _*) + for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) { + if (!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + val topicsStr = options.valueOf(topicsOpt) + val topics = topicsStr.split(",") + val numMessages = options.valueOf(numMessagesOpt).longValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) + val hideHeader = options.has(hideHeaderOpt) + val brokerList = options.valueOf(brokerListOpt) + val messageSize = options.valueOf(messageSizeOpt).intValue + var isFixedSize = !options.has(varyMessageSizeOpt) + var isSync = options.has(syncOpt) + var batchSize = options.valueOf(batchSizeOpt).intValue + var numThreads = options.valueOf(numThreadsOpt).intValue + val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) + val seqIdMode = options.has(initialMessageIdOpt) + var initialMessageId: Int = 0 + if (seqIdMode) + initialMessageId = options.valueOf(initialMessageIdOpt).intValue() + val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() + val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue() + val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue() + val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue() + val useNewProducer = options.has(useNewProducerOpt) + + val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) + + if (csvMetricsReporterEnabled) { + val props = new Properties() + props.put("kafka.metrics.polling.interval.secs", "1") + props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") + if (options.has(metricsDirectoryOpt)) + props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) + else + props.put("kafka.csv.metrics.dir", "kafka_metrics") + props.put("kafka.csv.metrics.reporter.enabled", "true") + val verifiableProps = new VerifiableProperties(props) + KafkaMetricsReporter.startReporters(verifiableProps) + } + + val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue() + } + + class ProducerThread(val threadId: Int, + val config: ProducerPerfConfig, + val totalBytesSent: AtomicLong, + val totalMessagesSent: AtomicLong, + val allDone: CountDownLatch, + val rand: Random) extends Runnable { + val seqIdNumDigit = 10 // no. of digits for max int value + + val messagesPerThread = config.numMessages / config.numThreads + debug("Messages per thread = " + messagesPerThread) + val props = new Properties() + val producer = + if (config.useNewProducer) { + import org.apache.kafka.clients.producer.ProducerConfig + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance") + props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) + new NewShinyProducer(props) + } else { + props.put("metadata.broker.list", config.brokerList) + props.put("compression.codec", config.compressionCodec.codec.toString) + props.put("send.buffer.bytes", (64 * 1024).toString) + if (!config.isSync) { + props.put("producer.type", "async") + props.put("batch.num.messages", config.batchSize.toString) + props.put("queue.enqueue.timeout.ms", "-1") + } + props.put("client.id", "producer-performance") + props.put("request.required.acks", config.producerRequestRequiredAcks.toString) + props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) + props.put("message.send.max.retries", config.producerNumRetries.toString) + props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) + props.put("serializer.class", classOf[DefaultEncoder].getName) + props.put("key.serializer.class", classOf[NullEncoder[Long]].getName) + new OldProducer(props) + } + + // generate the sequential message ID + private val SEP = ":" // message field separator + private val messageIdLabel = "MessageID" + private val threadIdLabel = "ThreadID" + private val topicLabel = "Topic" + private var leftPaddedSeqId: String = "" + + private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = { + // Each thread gets a unique range of sequential no. for its ids. + // Eg. 1000 msg in 10 threads => 100 msg per thread + // thread 0 IDs : 0 ~ 99 + // thread 1 IDs : 100 ~ 199 + // thread 2 IDs : 200 ~ 299 + // . . . + leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId)) + + val msgHeader = topicLabel + SEP + + topic + SEP + + threadIdLabel + SEP + + threadId + SEP + + messageIdLabel + SEP + + leftPaddedSeqId + SEP + + val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x') + debug(seqMsgString) + return seqMsgString.getBytes() + } + + private def generateProducerData(topic: String, messageId: Long): Array[Byte] = { + val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize) + if (config.seqIdMode) { + val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId + generateMessageWithSeqId(topic, seqId, msgSize) + } else { + new Array[Byte](msgSize) + } + } + + override def run { + var bytesSent = 0L + var nSends = 0 + var i: Long = 0L + var message: Array[Byte] = null + + while (i < messagesPerThread) { + try { + config.topics.foreach( + topic => { + message = generateProducerData(topic, i) + producer.send(topic, BigInteger.valueOf(i).toByteArray, message) + bytesSent += message.size + nSends += 1 + if (config.messageSendGapMs > 0) + Thread.sleep(config.messageSendGapMs) + }) + } catch { + case e: Throwable => error("Error when sending message " + new String(message), e) + } + i += 1 + } + try { + producer.close() + } catch { + case e: Throwable => error("Error when closing producer", e) + } + totalBytesSent.addAndGet(bytesSent) + totalMessagesSent.addAndGet(nSends) + allDone.countDown() + } + } +} diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala new file mode 100644 index 0000000..8b8c472 --- /dev/null +++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.net.URI +import java.text.SimpleDateFormat +import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} +import kafka.consumer.SimpleConsumer +import kafka.utils._ +import org.apache.log4j.Logger +import kafka.common.TopicAndPartition + + +/** + * Performance test for the simple consumer + */ +object SimpleConsumerPerformance { + + def main(args: Array[String]) { + val logger = Logger.getLogger(getClass) + val config = new ConsumerPerfConfig(args) + + if(!config.hideHeader) { + if(!config.showDetailedStats) + println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + else + println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + } + + val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId) + + // reset to latest or smallest offset + val topicAndPartition = TopicAndPartition(config.topic, config.partition) + val request = OffsetRequest(Map( + topicAndPartition -> PartitionOffsetRequestInfo(if (config.fromLatest) OffsetRequest.LatestTime else OffsetRequest.EarliestTime, 1) + )) + var offset: Long = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + + val startMs = System.currentTimeMillis + var done = false + var totalBytesRead = 0L + var totalMessagesRead = 0L + var consumedInterval = 0 + var lastReportTime: Long = startMs + var lastBytesRead = 0L + var lastMessagesRead = 0L + while(!done) { + // TODO: add in the maxWait and minBytes for performance + val request = new FetchRequestBuilder() + .clientId(config.clientId) + .addFetch(config.topic, config.partition, offset, config.fetchSize) + .build() + val fetchResponse = consumer.fetch(request) + + var messagesRead = 0 + var bytesRead = 0 + val messageSet = fetchResponse.messageSet(config.topic, config.partition) + for (message <- messageSet) { + messagesRead += 1 + bytesRead += message.message.payloadSize + } + + if(messagesRead == 0 || totalMessagesRead > config.numMessages) + done = true + else + // we only did one fetch so we find the offset for the first (head) messageset + offset += messageSet.validBytes + + totalBytesRead += bytesRead + totalMessagesRead += messagesRead + consumedInterval += messagesRead + + if(consumedInterval > config.reportingInterval) { + if(config.showDetailedStats) { + val reportTime = System.currentTimeMillis + val elapsed = (reportTime - lastReportTime)/1000.0 + val totalMBRead = ((totalBytesRead-lastBytesRead)*1.0)/(1024*1024) + println(("%s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(reportTime), config.fetchSize, + (totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed, + totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed)) + } + lastReportTime = SystemTime.milliseconds + lastBytesRead = totalBytesRead + lastMessagesRead = totalMessagesRead + consumedInterval = 0 + } + } + val reportTime = System.currentTimeMillis + val elapsed = (reportTime - startMs) / 1000.0 + + if(!config.showDetailedStats) { + val totalMBRead = (totalBytesRead*1.0)/(1024*1024) + println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), + config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed, + totalMessagesRead, totalMessagesRead/elapsed)) + } + System.exit(0) + } + + class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { + val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") + .withRequiredArg + .describedAs("kafka://hostname:port") + .ofType(classOf[String]) + val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + + "offset to consume from, start with the latest message present in the log rather than the earliest message.") + val partitionOpt = parser.accepts("partition", "The topic partition to consume from.") + .withRequiredArg + .describedAs("partition") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.") + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024*1024) + val clientIdOpt = parser.accepts("clientId", "The ID of this client.") + .withRequiredArg + .describedAs("clientId") + .ofType(classOf[String]) + .defaultsTo("SimpleConsumerPerformanceClient") + + val options = parser.parse(args : _*) + + for(arg <- List(topicOpt, urlOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + val url = new URI(options.valueOf(urlOpt)) + val fetchSize = options.valueOf(fetchSizeOpt).intValue + val fromLatest = options.has(resetBeginningOffsetOpt) + val partition = options.valueOf(partitionOpt).intValue + val topic = options.valueOf(topicOpt) + val numMessages = options.valueOf(numMessagesOpt).longValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val showDetailedStats = options.has(showDetailedStatsOpt) + val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) + val hideHeader = options.has(hideHeaderOpt) + val clientId = options.valueOf(clientIdOpt).toString + } +} diff --git a/perf/config/log4j.properties b/perf/config/log4j.properties deleted file mode 100644 index 542b739..0000000 --- a/perf/config/log4j.properties +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -log4j.rootLogger=INFO, fileAppender - -log4j.appender.fileAppender=org.apache.log4j.FileAppender -log4j.appender.fileAppender.File=perf.log -log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.fileAppender.layout.ConversionPattern=%m %n - -# Turn on all our debugging info -log4j.logger.kafka=INFO - diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala deleted file mode 100644 index 4dde468..0000000 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.perf - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicLong -import java.nio.channels.ClosedByInterruptException -import org.apache.log4j.Logger -import kafka.message.Message -import kafka.utils.ZkUtils -import java.util.{ Random, Properties } -import kafka.consumer._ -import java.text.SimpleDateFormat - -/** - * Performance test for the full zookeeper consumer - */ -object ConsumerPerformance { - private val logger = Logger.getLogger(getClass()) - - def main(args: Array[String]): Unit = { - - val config = new ConsumerPerfConfig(args) - logger.info("Starting consumer...") - var totalMessagesRead = new AtomicLong(0) - var totalBytesRead = new AtomicLong(0) - - if (!config.hideHeader) { - if (!config.showDetailedStats) - println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - else - println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - } - - // clean up zookeeper state for this group id for every perf run - ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId) - - val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig) - - val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads)) - var threadList = List[ConsumerPerfThread]() - for ((topic, streamList) <- topicMessageStreams) - for (i <- 0 until streamList.length) - threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, - totalMessagesRead, totalBytesRead) - - logger.info("Sleeping for 1 second.") - Thread.sleep(1000) - logger.info("starting threads") - val startMs = System.currentTimeMillis - for (thread <- threadList) - thread.start - - for (thread <- threadList) - thread.join - - val endMs = System.currentTimeMillis - val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0 - if (!config.showDetailedStats) { - val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024) - println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs), - config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, - totalMessagesRead.get / elapsedSecs)) - } - System.exit(0) - } - - class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val groupIdOpt = parser.accepts("group", "The group id to consume on.") - .withRequiredArg - .describedAs("gid") - .defaultsTo("perf-consumer-" + new Random().nextInt(100000)) - .ofType(classOf[String]) - val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024 * 1024) - val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + - "offset to consume from, start with the latest message present in the log rather than the earliest message.") - val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(2 * 1024 * 1024) - val numThreadsOpt = parser.accepts("threads", "Number of processing threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(10) - val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - - val options = parser.parse(args: _*) - - for (arg <- List(topicOpt, zkConnectOpt)) { - if (!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - val props = new Properties - props.put("group.id", options.valueOf(groupIdOpt)) - props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) - props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) - props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest") - props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - props.put("consumer.timeout.ms", "5000") - props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString) - val consumerConfig = new ConsumerConfig(props) - val numThreads = options.valueOf(numThreadsOpt).intValue - val topic = options.valueOf(topicOpt) - val numMessages = options.valueOf(numMessagesOpt).longValue - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - val showDetailedStats = options.has(showDetailedStatsOpt) - val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) - val hideHeader = options.has(hideHeaderOpt) - } - - class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]], - config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) - extends Thread(name) { - - override def run() { - var bytesRead = 0L - var messagesRead = 0L - val startMs = System.currentTimeMillis - var lastReportTime: Long = startMs - var lastBytesRead = 0L - var lastMessagesRead = 0L - - try { - val iter = stream.iterator - while (iter.hasNext && messagesRead < config.numMessages) { - val messageAndMetadata = iter.next - messagesRead += 1 - bytesRead += messageAndMetadata.message.length - - if (messagesRead % config.reportingInterval == 0) { - if (config.showDetailedStats) - printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis) - lastReportTime = System.currentTimeMillis - lastMessagesRead = messagesRead - lastBytesRead = bytesRead - } - } - } catch { - case _: InterruptedException => - case _: ClosedByInterruptException => - case _: ConsumerTimeoutException => - case e: Throwable => e.printStackTrace() - } - totalMessagesRead.addAndGet(messagesRead) - totalBytesRead.addAndGet(bytesRead) - if (config.showDetailedStats) - printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis) - } - - private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long, - startMs: Long, endMs: Long) = { - val elapsedMs = endMs - startMs - val totalMBRead = (bytesRead * 1.0) / (1024 * 1024) - val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024) - println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id, - config.consumerConfig.fetchMessageMaxBytes, totalMBRead, - 1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0)) - } - } - -} diff --git a/perf/src/main/scala/kafka/perf/PerfConfig.scala b/perf/src/main/scala/kafka/perf/PerfConfig.scala deleted file mode 100644 index a8fc6b9..0000000 --- a/perf/src/main/scala/kafka/perf/PerfConfig.scala +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.perf - -import joptsimple.OptionParser - - -class PerfConfig(args: Array[String]) { - val parser = new OptionParser - val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Long]) - .defaultsTo(Long.MaxValue) - val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(5000) - val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + - "See java.text.SimpleDateFormat for options.") - .withRequiredArg - .describedAs("date format") - .ofType(classOf[String]) - .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS") - val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + - "interval as configured by reporting-interval") - val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") - val messageSizeOpt = parser.accepts("message-size", "The size of each message.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) - val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(200) - val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") - .withRequiredArg - .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - val helpOpt = parser.accepts("help", "Print usage.") -} diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala deleted file mode 100644 index 00fa90b..0000000 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.perf - -import kafka.metrics.KafkaMetricsReporter -import kafka.producer.{OldProducer, NewShinyProducer} -import kafka.utils.{VerifiableProperties, Logging} -import kafka.message.CompressionCodec -import kafka.serializer._ - -import java.util.concurrent.{CountDownLatch, Executors} -import java.util.concurrent.atomic.AtomicLong -import java.util._ -import java.text.SimpleDateFormat -import java.math.BigInteger -import scala.collection.immutable.List - -import org.apache.log4j.Logger - -/** - * Load test for the producer - */ -object ProducerPerformance extends Logging { - - def main(args: Array[String]) { - - val logger = Logger.getLogger(getClass) - val config = new ProducerPerfConfig(args) - if (!config.isFixedSize) - logger.info("WARN: Throughput will be slower due to changing message size per request") - - val totalBytesSent = new AtomicLong(0) - val totalMessagesSent = new AtomicLong(0) - val executor = Executors.newFixedThreadPool(config.numThreads) - val allDone = new CountDownLatch(config.numThreads) - val startMs = System.currentTimeMillis - val rand = new java.util.Random - - if (!config.hideHeader) - println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " + - "total.data.sent.in.nMsg, nMsg.sec") - - for (i <- 0 until config.numThreads) { - executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand)) - } - - allDone.await() - val endMs = System.currentTimeMillis - val elapsedSecs = (endMs - startMs) / 1000.0 - val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024) - println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format( - config.dateFormat.format(startMs), config.dateFormat.format(endMs), - config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent, - totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs)) - System.exit(0) - } - - class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.") - .withRequiredArg - .describedAs("hostname:port,..,hostname:port") - .ofType(classOf[String]) - val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to") - .withRequiredArg - .describedAs("topic1,topic2..") - .ofType(classOf[String]) - val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3000) - val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3) - val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) - val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + - "to complete") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(-1) - val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") - val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.") - val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") - .withRequiredArg - .describedAs("number of threads") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " + - "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + - "in the form of 'Message:000...1:xxx...'") - .withRequiredArg() - .describedAs("initial message id") - .ofType(classOf[java.lang.Integer]) - val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends") - .withRequiredArg() - .describedAs("message send time gap") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") - val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") - .withRequiredArg - .describedAs("metrics dictory") - .ofType(classOf[java.lang.String]) - val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") - - val options = parser.parse(args: _*) - for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) { - if (!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - val topicsStr = options.valueOf(topicsOpt) - val topics = topicsStr.split(",") - val numMessages = options.valueOf(numMessagesOpt).longValue - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) - val hideHeader = options.has(hideHeaderOpt) - val brokerList = options.valueOf(brokerListOpt) - val messageSize = options.valueOf(messageSizeOpt).intValue - var isFixedSize = !options.has(varyMessageSizeOpt) - var isSync = options.has(syncOpt) - var batchSize = options.valueOf(batchSizeOpt).intValue - var numThreads = options.valueOf(numThreadsOpt).intValue - val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) - val seqIdMode = options.has(initialMessageIdOpt) - var initialMessageId: Int = 0 - if (seqIdMode) - initialMessageId = options.valueOf(initialMessageIdOpt).intValue() - val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() - val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue() - val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue() - val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue() - val useNewProducer = options.has(useNewProducerOpt) - - val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) - - if (csvMetricsReporterEnabled) { - val props = new Properties() - props.put("kafka.metrics.polling.interval.secs", "1") - props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") - if (options.has(metricsDirectoryOpt)) - props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) - else - props.put("kafka.csv.metrics.dir", "kafka_metrics") - props.put("kafka.csv.metrics.reporter.enabled", "true") - val verifiableProps = new VerifiableProperties(props) - KafkaMetricsReporter.startReporters(verifiableProps) - } - - val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue() - } - - class ProducerThread(val threadId: Int, - val config: ProducerPerfConfig, - val totalBytesSent: AtomicLong, - val totalMessagesSent: AtomicLong, - val allDone: CountDownLatch, - val rand: Random) extends Runnable { - val seqIdNumDigit = 10 // no. of digits for max int value - - val messagesPerThread = config.numMessages / config.numThreads - debug("Messages per thread = " + messagesPerThread) - val props = new Properties() - val producer = - if (config.useNewProducer) { - import org.apache.kafka.clients.producer.ProducerConfig - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance") - props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) - new NewShinyProducer(props) - } else { - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec.codec.toString) - props.put("send.buffer.bytes", (64 * 1024).toString) - if (!config.isSync) { - props.put("producer.type", "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("queue.enqueue.timeout.ms", "-1") - } - props.put("client.id", "producer-performance") - props.put("request.required.acks", config.producerRequestRequiredAcks.toString) - props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) - props.put("message.send.max.retries", config.producerNumRetries.toString) - props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) - props.put("serializer.class", classOf[DefaultEncoder].getName) - props.put("key.serializer.class", classOf[NullEncoder[Long]].getName) - new OldProducer(props) - } - - // generate the sequential message ID - private val SEP = ":" // message field separator - private val messageIdLabel = "MessageID" - private val threadIdLabel = "ThreadID" - private val topicLabel = "Topic" - private var leftPaddedSeqId: String = "" - - private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = { - // Each thread gets a unique range of sequential no. for its ids. - // Eg. 1000 msg in 10 threads => 100 msg per thread - // thread 0 IDs : 0 ~ 99 - // thread 1 IDs : 100 ~ 199 - // thread 2 IDs : 200 ~ 299 - // . . . - leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId)) - - val msgHeader = topicLabel + SEP + - topic + SEP + - threadIdLabel + SEP + - threadId + SEP + - messageIdLabel + SEP + - leftPaddedSeqId + SEP - - val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x') - debug(seqMsgString) - return seqMsgString.getBytes() - } - - private def generateProducerData(topic: String, messageId: Long): Array[Byte] = { - val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize) - if (config.seqIdMode) { - val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId - generateMessageWithSeqId(topic, seqId, msgSize) - } else { - new Array[Byte](msgSize) - } - } - - override def run { - var bytesSent = 0L - var nSends = 0 - var i: Long = 0L - var message: Array[Byte] = null - - while (i < messagesPerThread) { - try { - config.topics.foreach( - topic => { - message = generateProducerData(topic, i) - producer.send(topic, BigInteger.valueOf(i).toByteArray, message) - bytesSent += message.size - nSends += 1 - if (config.messageSendGapMs > 0) - Thread.sleep(config.messageSendGapMs) - }) - } catch { - case e: Throwable => error("Error when sending message " + new String(message), e) - } - i += 1 - } - try { - producer.close() - } catch { - case e: Throwable => error("Error when closing producer", e) - } - totalBytesSent.addAndGet(bytesSent) - totalMessagesSent.addAndGet(nSends) - allDone.countDown() - } - } -} diff --git a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala deleted file mode 100644 index c52ada0..0000000 --- a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.perf - -import java.net.URI -import java.text.SimpleDateFormat -import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} -import kafka.consumer.SimpleConsumer -import kafka.utils._ -import org.apache.log4j.Logger -import kafka.common.TopicAndPartition - - -/** - * Performance test for the simple consumer - */ -object SimpleConsumerPerformance { - - def main(args: Array[String]) { - val logger = Logger.getLogger(getClass) - val config = new ConsumerPerfConfig(args) - - if(!config.hideHeader) { - if(!config.showDetailedStats) - println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - else - println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - } - - val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId) - - // reset to latest or smallest offset - val topicAndPartition = TopicAndPartition(config.topic, config.partition) - val request = OffsetRequest(Map( - topicAndPartition -> PartitionOffsetRequestInfo(if (config.fromLatest) OffsetRequest.LatestTime else OffsetRequest.EarliestTime, 1) - )) - var offset: Long = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head - - val startMs = System.currentTimeMillis - var done = false - var totalBytesRead = 0L - var totalMessagesRead = 0L - var consumedInterval = 0 - var lastReportTime: Long = startMs - var lastBytesRead = 0L - var lastMessagesRead = 0L - while(!done) { - // TODO: add in the maxWait and minBytes for performance - val request = new FetchRequestBuilder() - .clientId(config.clientId) - .addFetch(config.topic, config.partition, offset, config.fetchSize) - .build() - val fetchResponse = consumer.fetch(request) - - var messagesRead = 0 - var bytesRead = 0 - val messageSet = fetchResponse.messageSet(config.topic, config.partition) - for (message <- messageSet) { - messagesRead += 1 - bytesRead += message.message.payloadSize - } - - if(messagesRead == 0 || totalMessagesRead > config.numMessages) - done = true - else - // we only did one fetch so we find the offset for the first (head) messageset - offset += messageSet.validBytes - - totalBytesRead += bytesRead - totalMessagesRead += messagesRead - consumedInterval += messagesRead - - if(consumedInterval > config.reportingInterval) { - if(config.showDetailedStats) { - val reportTime = System.currentTimeMillis - val elapsed = (reportTime - lastReportTime)/1000.0 - val totalMBRead = ((totalBytesRead-lastBytesRead)*1.0)/(1024*1024) - println(("%s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(reportTime), config.fetchSize, - (totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed, - totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed)) - } - lastReportTime = SystemTime.milliseconds - lastBytesRead = totalBytesRead - lastMessagesRead = totalMessagesRead - consumedInterval = 0 - } - } - val reportTime = System.currentTimeMillis - val elapsed = (reportTime - startMs) / 1000.0 - - if(!config.showDetailedStats) { - val totalMBRead = (totalBytesRead*1.0)/(1024*1024) - println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), - config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed, - totalMessagesRead, totalMessagesRead/elapsed)) - } - System.exit(0) - } - - class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") - .withRequiredArg - .describedAs("kafka://hostname:port") - .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + - "offset to consume from, start with the latest message present in the log rather than the earliest message.") - val partitionOpt = parser.accepts("partition", "The topic partition to consume from.") - .withRequiredArg - .describedAs("partition") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.") - .withRequiredArg - .describedAs("bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024*1024) - val clientIdOpt = parser.accepts("clientId", "The ID of this client.") - .withRequiredArg - .describedAs("clientId") - .ofType(classOf[String]) - .defaultsTo("SimpleConsumerPerformanceClient") - - val options = parser.parse(args : _*) - - for(arg <- List(topicOpt, urlOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - val url = new URI(options.valueOf(urlOpt)) - val fetchSize = options.valueOf(fetchSizeOpt).intValue - val fromLatest = options.has(resetBeginningOffsetOpt) - val partition = options.valueOf(partitionOpt).intValue - val topic = options.valueOf(topicOpt) - val numMessages = options.valueOf(numMessagesOpt).longValue - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - val showDetailedStats = options.has(showDetailedStatsOpt) - val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) - val hideHeader = options.has(hideHeaderOpt) - val clientId = options.valueOf(clientIdOpt).toString - } -}