diff --git a/bin/kafka-console-consumer.sh b/bin/kafka-console-consumer.sh index e410dde..07c90a9 100755 --- a/bin/kafka-console-consumer.sh +++ b/bin/kafka-console-consumer.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.consumer.ConsoleConsumer $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer $@ diff --git a/bin/kafka-consumer-perf-test.sh b/bin/kafka-consumer-perf-test.sh index 4ed3ed9..ebc513a 100755 --- a/bin/kafka-consumer-perf-test.sh +++ b/bin/kafka-consumer-perf-test.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,4 +17,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.perf.ConsumerPerformance $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsumerPerformance $@ diff --git a/bin/kafka-producer-perf-test.sh b/bin/kafka-producer-perf-test.sh index b4efc29..84ac949 100755 --- a/bin/kafka-producer-perf-test.sh +++ b/bin/kafka-producer-perf-test.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,4 +17,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.perf.ProducerPerformance $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.ProducerPerformance $@ diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index d2fc8c0..5d5021d 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -41,11 +41,6 @@ do CLASSPATH=$CLASSPATH:$file done -for file in $base_dir/perf/build/libs//kafka-perf_${SCALA_VERSION}*.jar; -do - CLASSPATH=$CLASSPATH:$file -done - for file in $base_dir/examples/build/libs//kafka-examples*.jar; do CLASSPATH=$CLASSPATH:$file @@ -155,6 +150,3 @@ if [ "x$DAEMON_MODE" = "xtrue" ]; then else exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" fi - - - diff --git a/bin/kafka-simple-consumer-perf-test.sh b/bin/kafka-simple-consumer-perf-test.sh index 2d3e3d3..b1a5cfc 100755 --- a/bin/kafka-simple-consumer-perf-test.sh +++ b/bin/kafka-simple-consumer-perf-test.sh @@ -5,9 +5,9 @@ # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M" fi -exec $(dirname $0)/kafka-run-class.sh kafka.perf.SimpleConsumerPerformance $@ +exec $(dirname $0)/kafka-run-class.sh kafka.tools.SimpleConsumerPerformance $@ diff --git a/bin/windows/kafka-console-consumer.bat b/bin/windows/kafka-console-consumer.bat index 94b20a4..f70f98a 100644 --- a/bin/windows/kafka-console-consumer.bat +++ b/bin/windows/kafka-console-consumer.bat @@ -16,5 +16,5 @@ rem limitations under the License. SetLocal set KAFKA_HEAP_OPTS=-Xmx512M -%~dp0kafka-run-class.bat kafka.consumer.ConsoleConsumer %* +%~dp0kafka-run-class.bat kafka.tools.ConsoleConsumer %* EndLocal diff --git a/bin/windows/kafka-run-class.bat b/bin/windows/kafka-run-class.bat index f4d2904..2f781c6 100644 --- a/bin/windows/kafka-run-class.bat +++ b/bin/windows/kafka-run-class.bat @@ -36,11 +36,6 @@ for %%i in (%BASE_DIR%\core\build\dependant-libs-%SCALA_VERSION%\*.jar) do ( call :concat %%i ) -rem Classpath addition for kafka-perf dependencies -for %%i in (%BASE_DIR%\perf\build\dependant-libs-%SCALA_VERSION%\*.jar) do ( - call :concat %%i -) - rem Classpath addition for kafka-clients for %%i in (%BASE_DIR%\clients\build\libs\kafka-clients-*.jar) do ( call :concat %%i @@ -126,4 +121,4 @@ IF ["%CLASSPATH%"] EQU [""] ( set CLASSPATH="%1" ) ELSE ( set CLASSPATH=%CLASSPATH%;"%1" -) \ No newline at end of file +) diff --git a/build.gradle b/build.gradle index 2f4167f..b3bbd77 100644 --- a/build.gradle +++ b/build.gradle @@ -365,7 +365,6 @@ project(':clients') { dependencies { compile "org.slf4j:slf4j-api:1.7.6" compile 'org.xerial.snappy:snappy-java:1.0.5' - compile 'net.jpountz.lz4:lz4:1.2.0' testCompile 'com.novocode:junit-interface:0.9' testRuntime "$slf4jlog4j" diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 5227b2d..c557e44 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -20,7 +20,7 @@ package org.apache.kafka.common.record; * The compression type to use */ public enum CompressionType { - NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f), LZ4HC(4, "lz4hc", 0.5f); + NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f); public final int id; public final String name; @@ -40,10 +40,6 @@ public enum CompressionType { return GZIP; case 2: return SNAPPY; - case 3: - return LZ4; - case 4: - return LZ4HC; default: throw new IllegalArgumentException("Unknown compression type id: " + id); } @@ -56,10 +52,6 @@ public enum CompressionType { return GZIP; else if (SNAPPY.name.equals(name)) return SNAPPY; - else if (LZ4.name.equals(name)) - return LZ4; - else if (LZ4HC.name.equals(name)) - return LZ4HC; else throw new IllegalArgumentException("Unknown compression name: " + name); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 0fa6dd2..6ae3d06 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -208,29 +208,6 @@ public class Compressor { } catch (Exception e) { throw new KafkaException(e); } - case LZ4: - try { - Class LZ4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream"); - OutputStream stream = (OutputStream) LZ4BlockOutputStream.getConstructor(OutputStream.class) - .newInstance(buffer); - return new DataOutputStream(stream); - } catch (Exception e) { - throw new KafkaException(e); - } - case LZ4HC: - try { - Class factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory"); - Class compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor"); - Class lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream"); - Object factory = factoryClass.getMethod("fastestInstance").invoke(null); - Object compressor = factoryClass.getMethod("highCompressor").invoke(factory); - OutputStream stream = (OutputStream) lz4BlockOutputStream - .getConstructor(OutputStream.class, Integer.TYPE, compressorClass) - .newInstance(buffer, 1 << 16, compressor); - return new DataOutputStream(stream); - } catch (Exception e) { - throw new KafkaException(e); - } default: throw new IllegalArgumentException("Unknown compression type: " + type); } @@ -257,17 +234,6 @@ public class Compressor { } catch (Exception e) { throw new KafkaException(e); } - case LZ4: - case LZ4HC: - // dynamically load LZ4 class to avoid runtime dependency - try { - Class inputStreamClass = Class.forName("net.jpountz.lz4.LZ4BlockInputStream"); - InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class) - .newInstance(buffer); - return new DataInputStream(stream); - } catch (Exception e) { - throw new KafkaException(e); - } default: throw new IllegalArgumentException("Unknown compression type: " + type); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 10df9fd..ce1177e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -57,10 +57,10 @@ public final class Record { public static final byte CURRENT_MAGIC_VALUE = 0; /** - * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no + * Specifies the mask for the compression code. 2 bits to hold the compression codec. 0 is reserved to indicate no * compression */ - public static final int COMPRESSION_CODEC_MASK = 0x07; + public static final int COMPRESSION_CODEC_MASK = 0x03; /** * Compression code for uncompressed records diff --git a/config/producer.properties b/config/producer.properties index 39d65d7..52a7611 100644 --- a/config/producer.properties +++ b/config/producer.properties @@ -26,8 +26,8 @@ metadata.broker.list=localhost:9092 # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync -# specify the compression codec for all data generated: none, gzip, snappy, lz4, lz4hc. -# the old config values work as well: 0, 1, 2, 3, 4 for none, gzip, snappy, lz4, lz4hc, respectivally +# specify the compression codec for all data generated: none , gzip, snappy. +# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally compression.codec=none # message encoder diff --git a/config/tools-log4j.properties b/config/tools-log4j.properties index 52f07c9..ec851dd 100644 --- a/config/tools-log4j.properties +++ b/config/tools-log4j.properties @@ -13,8 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=WARN, stdout +log4j.rootLogger=WARN, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + + +log4j.appender.perfToolsAppender=org.apache.log4j.FileAppender +log4j.appender.perfToolsAppender.File=${kafka.logs.dir}/perf.log +log4j.appender.perfToolsAppender.layout=org.apache.log4j.PatternLayout +log4j.appender.perfToolsAppender.layout.ConversionPattern=%m %n + +log4j.logger.kafka.tools.performance=INFO, perfToolsAppender +log4j.additivity.state.change.logger=false diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala deleted file mode 100644 index 24c9287..0000000 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ /dev/null @@ -1,339 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.consumer - -import scala.collection.JavaConversions._ -import org.I0Itec.zkclient._ -import joptsimple._ -import java.util.Properties -import java.util.Random -import java.io.PrintStream -import kafka.message._ -import kafka.serializer._ -import kafka.utils._ -import kafka.metrics.KafkaMetricsReporter - - -/** - * Consumer that dumps messages out to standard out. - * - */ -object ConsoleConsumer extends Logging { - - def main(args: Array[String]) { - val parser = new OptionParser - val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") - .withRequiredArg - .describedAs("whitelist") - .ofType(classOf[String]) - val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.") - .withRequiredArg - .describedAs("blacklist") - .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - val groupIdOpt = parser.accepts("group", "The group id to consume on.") - .withRequiredArg - .describedAs("gid") - .defaultsTo("console-consumer-" + new Random().nextInt(100000)) - .ofType(classOf[String]) - val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024 * 1024) - val minFetchBytesOpt = parser.accepts("min-fetch-bytes", "The min number of bytes each fetch request waits for.") - .withRequiredArg - .describedAs("bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) - val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(2 * 1024 * 1024) - val socketTimeoutMsOpt = parser.accepts("socket-timeout-ms", "The socket timeout used for the connection to the broker") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.SocketTimeout) - val refreshMetadataBackoffMsOpt = parser.accepts("refresh-leader-backoff-ms", "Backoff time before refreshing metadata") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.RefreshMetadataBackoffMs) - val consumerTimeoutMsOpt = parser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much " + - "of time without incoming messages") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(-1) - val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") - .withRequiredArg - .describedAs("class") - .ofType(classOf[String]) - .defaultsTo(classOf[DefaultMessageFormatter].getName) - val messageFormatterArgOpt = parser.accepts("property") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[String]) - val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); - val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + - "start with the earliest message present in the log rather than the latest message.") - val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(ConsumerConfig.AutoCommitInterval) - val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") - .withRequiredArg - .describedAs("num_messages") - .ofType(classOf[java.lang.Integer]) - val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + - "skip it instead of halt.") - val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") - val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") - .withRequiredArg - .describedAs("metrics dictory") - .ofType(classOf[java.lang.String]) - val includeInternalTopicsOpt = parser.accepts("include-internal-topics", "Allow consuming internal topics.") - val offsetsStorageOpt = parser.accepts("offsets-storage", "Specify offsets storage backend (kafka/zookeeper).") - .withRequiredArg - .describedAs("Offsets storage method.") - .ofType(classOf[String]) - .defaultsTo("zookeeper") - val dualCommitEnabledOpt = parser.accepts("dual-commit-enabled", "If offsets storage is kafka and this is set, then commit to zookeeper as well.") - - val options: OptionSet = tryParse(parser, args) - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) - val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) - if (topicOrFilterOpt.size != 1) { - error("Exactly one of whitelist/blacklist/topic is required.") - parser.printHelpOn(System.err) - System.exit(1) - } - val topicArg = options.valueOf(topicOrFilterOpt.head) - val filterSpec = if (options.has(blacklistOpt)) - new Blacklist(topicArg) - else - new Whitelist(topicArg) - - val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) - if (csvMetricsReporterEnabled) { - val csvReporterProps = new Properties() - csvReporterProps.put("kafka.metrics.polling.interval.secs", "5") - csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") - if (options.has(metricsDirectoryOpt)) - csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) - else - csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics") - csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true") - val verifiableProps = new VerifiableProperties(csvReporterProps) - KafkaMetricsReporter.startReporters(verifiableProps) - } - - if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) && - checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + options.valueOf(groupIdOpt)+ "/offsets")) { - System.err.println("Found previous offset information for this group "+options.valueOf(groupIdOpt) - +". Please use --delete-consumer-offsets to delete previous offsets metadata") - System.exit(1) - } - - if(options.has(deleteConsumerOffsetsOpt)) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) - - val offsetsStorage = options.valueOf(offsetsStorageOpt) - val props = new Properties() - props.put("group.id", options.valueOf(groupIdOpt)) - props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) - props.put("socket.timeout.ms", options.valueOf(socketTimeoutMsOpt).toString) - props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) - props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString) - props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString) - props.put("auto.commit.enable", "true") - props.put("auto.commit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString) - props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") - props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString) - props.put("refresh.leader.backoff.ms", options.valueOf(refreshMetadataBackoffMsOpt).toString) - props.put("offsets.storage", offsetsStorage) - if (options.has(includeInternalTopicsOpt)) - props.put("exclude.internal.topics", "false") - if (options.has(dualCommitEnabledOpt)) - props.put("dual.commit.enabled", "true") - else - props.put("dual.commit.enabled", "false") - - val config = new ConsumerConfig(props) - val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false - - val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) - val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) - - val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 - - val connector = Consumer.create(config) - - - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - connector.shutdown() - // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if(!options.has(groupIdOpt)) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) - } - }) - - var numMessages = 0L - val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] - formatter.init(formatterArgs) - try { - val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) - val iter = if(maxMessages >= 0) - stream.slice(0, maxMessages) - else - stream - - for(messageAndTopic <- iter) { - try { - formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) - numMessages += 1 - } catch { - case e: Throwable => - if (skipMessageOnError) - error("Error processing message, skipping this message: ", e) - else - throw e - } - if(System.out.checkError()) { - // This means no one is listening to our output stream any more, time to shutdown - System.err.println("Unable to write to standard out, closing consumer.") - System.err.println("Consumed %d messages".format(numMessages)) - formatter.close() - connector.shutdown() - System.exit(1) - } - } - } catch { - case e: Throwable => error("Error processing message, stopping consumer: ", e) - } - System.err.println("Consumed %d messages".format(numMessages)) - System.out.flush() - formatter.close() - connector.shutdown() - } - - def tryParse(parser: OptionParser, args: Array[String]) = { - try { - parser.parse(args : _*) - } catch { - case e: OptionException => { - Utils.croak(e.getMessage) - null - } - } - } - - def checkZkPathExists(zkUrl: String, path: String): Boolean = { - try { - val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); - zk.exists(path) - } catch { - case _: Throwable => false - } - } -} - -object MessageFormatter { - def tryParseFormatterArgs(args: Iterable[String]): Properties = { - val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0) - if(!splits.forall(_.length == 2)) { - System.err.println("Invalid parser arguments: " + args.mkString(" ")) - System.exit(1) - } - val props = new Properties - for(a <- splits) - props.put(a(0), a(1)) - props - } -} - -trait MessageFormatter { - def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) - def init(props: Properties) {} - def close() {} -} - -class DefaultMessageFormatter extends MessageFormatter { - var printKey = false - var keySeparator = "\t".getBytes - var lineSeparator = "\n".getBytes - - override def init(props: Properties) { - if(props.containsKey("print.key")) - printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") - if(props.containsKey("key.separator")) - keySeparator = props.getProperty("key.separator").getBytes - if(props.containsKey("line.separator")) - lineSeparator = props.getProperty("line.separator").getBytes - } - - def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { - if(printKey) { - output.write(if (key == null) "null".getBytes() else key) - output.write(keySeparator) - } - output.write(if (value == null) "null".getBytes() else value) - output.write(lineSeparator) - } -} - -class NoOpMessageFormatter extends MessageFormatter { - override def init(props: Properties) {} - def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {} -} - -class ChecksumMessageFormatter extends MessageFormatter { - private var topicStr: String = _ - - override def init(props: Properties) { - topicStr = props.getProperty("topic") - if (topicStr != null) - topicStr = topicStr + ":" - else - topicStr = "" - } - - def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { - val chksum = new Message(value, key).checksum - output.println(topicStr + "checksum:" + chksum) - } -} diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala index f384e04..d08c3f4 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala @@ -44,8 +44,6 @@ class TopicMetadata(private val underlying: kafka.api.TopicMetadata) { def errorCode: Short = underlying.errorCode def sizeInBytes: Int = underlying.sizeInBytes - - override def toString = underlying.toString } @@ -70,6 +68,5 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) { def errorCode: Short = underlying.errorCode def sizeInBytes: Int = underlying.sizeInBytes - - override def toString = underlying.toString } + diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala index 3359060..252a0c9 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala @@ -23,15 +23,4 @@ class TopicMetadataResponse(private val underlying: kafka.api.TopicMetadataRespo import kafka.javaapi.MetadataListImplicits._ underlying.topicsMetadata } - - override def equals(other: Any) = canEqual(other) && { - val otherTopicMetadataResponse = other.asInstanceOf[kafka.javaapi.TopicMetadataResponse] - this.underlying.equals(otherTopicMetadataResponse.underlying) - } - - def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.TopicMetadataResponse] - - override def hashCode = underlying.hashCode - - override def toString = underlying.toString } diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala index de0a0fa..8762a79 100644 --- a/core/src/main/scala/kafka/message/CompressionCodec.scala +++ b/core/src/main/scala/kafka/message/CompressionCodec.scala @@ -23,8 +23,6 @@ object CompressionCodec { case NoCompressionCodec.codec => NoCompressionCodec case GZIPCompressionCodec.codec => GZIPCompressionCodec case SnappyCompressionCodec.codec => SnappyCompressionCodec - case LZ4CompressionCodec.codec => LZ4CompressionCodec - case LZ4HCCompressionCodec.codec => LZ4HCCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec)) } } @@ -33,8 +31,6 @@ object CompressionCodec { case NoCompressionCodec.name => NoCompressionCodec case GZIPCompressionCodec.name => GZIPCompressionCodec case SnappyCompressionCodec.name => SnappyCompressionCodec - case LZ4CompressionCodec.name => LZ4CompressionCodec - case LZ4HCCompressionCodec.name => LZ4HCCompressionCodec case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression codec".format(name)) } } @@ -57,16 +53,6 @@ case object SnappyCompressionCodec extends CompressionCodec { val name = "snappy" } -case object LZ4CompressionCodec extends CompressionCodec { - val codec = 3 - val name = "lz4" -} - -case object LZ4HCCompressionCodec extends CompressionCodec { - val codec = 4 - val name = "lz4hc" -} - case object NoCompressionCodec extends CompressionCodec { val codec = 0 val name = "none" diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala index 8420e13..ca833ee 100644 --- a/core/src/main/scala/kafka/message/CompressionFactory.scala +++ b/core/src/main/scala/kafka/message/CompressionFactory.scala @@ -31,12 +31,6 @@ object CompressionFactory { case SnappyCompressionCodec => import org.xerial.snappy.SnappyOutputStream new SnappyOutputStream(stream) - case LZ4CompressionCodec => - import net.jpountz.lz4.LZ4BlockOutputStream - new LZ4BlockOutputStream(stream) - case LZ4HCCompressionCodec => - import net.jpountz.lz4.{LZ4BlockOutputStream, LZ4Factory} - new LZ4BlockOutputStream(stream, 1 << 16, LZ4Factory.fastestInstance().highCompressor()) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } @@ -49,9 +43,6 @@ object CompressionFactory { case SnappyCompressionCodec => import org.xerial.snappy.SnappyInputStream new SnappyInputStream(stream) - case LZ4CompressionCodec | LZ4HCCompressionCodec => - import net.jpountz.lz4.LZ4BlockInputStream - new LZ4BlockInputStream(stream) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec) } diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index d2a7293..52c082f 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -54,10 +54,10 @@ object Message { val CurrentMagicValue: Byte = 0 /** - * Specifies the mask for the compression code. 3 bits to hold the compression codec. + * Specifies the mask for the compression code. 2 bits to hold the compression codec. * 0 is reserved to indicate no compression */ - val CompressionCodeMask: Int = 0x07 + val CompressionCodeMask: Int = 0x03 /** * Compression code for uncompressed messages diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 5417628..0e22897 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -30,7 +30,7 @@ import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import scala.Some import kafka.common.TopicAndPartition -import kafka.consumer.MessageFormatter +import kafka.tools.MessageFormatter import java.io.PrintStream import org.apache.kafka.common.protocol.types.{Struct, Schema, Field} import org.apache.kafka.common.protocol.types.Type.STRING @@ -247,7 +247,7 @@ class OffsetManager(val config: OffsetManagerConfig, * Asynchronously read the partition from the offsets topic and populate the cache */ def loadOffsetsFromLog(offsetsPartition: Int) { - + val topicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition) loadingPartitions synchronized { @@ -477,4 +477,3 @@ case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition) } - diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala new file mode 100644 index 0000000..82e4cb2 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import scala.collection.JavaConversions._ +import org.I0Itec.zkclient._ +import joptsimple._ +import java.util.Properties +import java.util.Random +import java.io.PrintStream +import kafka.message._ +import kafka.serializer._ +import kafka.utils._ +import kafka.metrics.KafkaMetricsReporter +import kafka.consumer.{ConsumerConfig,Blacklist,Whitelist,Consumer} + +/** + * Consumer that dumps messages out to standard out. + * + */ +object ConsoleConsumer extends Logging { + + def main(args: Array[String]) { + val parser = new OptionParser + val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") + .withRequiredArg + .describedAs("whitelist") + .ofType(classOf[String]) + val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.") + .withRequiredArg + .describedAs("blacklist") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val groupIdOpt = parser.accepts("group", "The group id to consume on.") + .withRequiredArg + .describedAs("gid") + .defaultsTo("console-consumer-" + new Random().nextInt(100000)) + .ofType(classOf[String]) + val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024 * 1024) + val minFetchBytesOpt = parser.accepts("min-fetch-bytes", "The min number of bytes each fetch request waits for.") + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(2 * 1024 * 1024) + val socketTimeoutMsOpt = parser.accepts("socket-timeout-ms", "The socket timeout used for the connection to the broker") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(ConsumerConfig.SocketTimeout) + val refreshMetadataBackoffMsOpt = parser.accepts("refresh-leader-backoff-ms", "Backoff time before refreshing metadata") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(ConsumerConfig.RefreshMetadataBackoffMs) + val consumerTimeoutMsOpt = parser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much " + + "of time without incoming messages") + .withRequiredArg + .describedAs("prop") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(-1) + val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") + .withRequiredArg + .describedAs("class") + .ofType(classOf[String]) + .defaultsTo(classOf[DefaultMessageFormatter].getName) + val messageFormatterArgOpt = parser.accepts("property") + .withRequiredArg + .describedAs("prop") + .ofType(classOf[String]) + val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); + val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + + "start with the earliest message present in the log rather than the latest message.") + val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms") + .withRequiredArg + .describedAs("ms") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(ConsumerConfig.AutoCommitInterval) + val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) + val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + + "skip it instead of halt.") + val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") + val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + + "set, the csv metrics will be outputed here") + .withRequiredArg + .describedAs("metrics dictory") + .ofType(classOf[java.lang.String]) + val includeInternalTopicsOpt = parser.accepts("include-internal-topics", "Allow consuming internal topics.") + val offsetsStorageOpt = parser.accepts("offsets-storage", "Specify offsets storage backend (kafka/zookeeper).") + .withRequiredArg + .describedAs("Offsets storage method.") + .ofType(classOf[String]) + .defaultsTo("zookeeper") + val dualCommitEnabledOpt = parser.accepts("dual-commit-enabled", "If offsets storage is kafka and this is set, then commit to zookeeper as well.") + + val options: OptionSet = tryParse(parser, args) + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) + if (topicOrFilterOpt.size != 1) { + error("Exactly one of whitelist/blacklist/topic is required.") + parser.printHelpOn(System.err) + System.exit(1) + } + val topicArg = options.valueOf(topicOrFilterOpt.head) + val filterSpec = if (options.has(blacklistOpt)) + new Blacklist(topicArg) + else + new Whitelist(topicArg) + + val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) + if (csvMetricsReporterEnabled) { + val csvReporterProps = new Properties() + csvReporterProps.put("kafka.metrics.polling.interval.secs", "5") + csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") + if (options.has(metricsDirectoryOpt)) + csvReporterProps.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) + else + csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics") + csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true") + val verifiableProps = new VerifiableProperties(csvReporterProps) + KafkaMetricsReporter.startReporters(verifiableProps) + } + + if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) && + checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + options.valueOf(groupIdOpt)+ "/offsets")) { + System.err.println("Found previous offset information for this group "+options.valueOf(groupIdOpt) + +". Please use --delete-consumer-offsets to delete previous offsets metadata") + System.exit(1) + } + + if(options.has(deleteConsumerOffsetsOpt)) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) + + val offsetsStorage = options.valueOf(offsetsStorageOpt) + val props = new Properties() + props.put("group.id", options.valueOf(groupIdOpt)) + props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) + props.put("socket.timeout.ms", options.valueOf(socketTimeoutMsOpt).toString) + props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) + props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString) + props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString) + props.put("auto.commit.enable", "true") + props.put("auto.commit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString) + props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") + props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) + props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString) + props.put("refresh.leader.backoff.ms", options.valueOf(refreshMetadataBackoffMsOpt).toString) + props.put("offsets.storage", offsetsStorage) + if (options.has(includeInternalTopicsOpt)) + props.put("exclude.internal.topics", "false") + if (options.has(dualCommitEnabledOpt)) + props.put("dual.commit.enabled", "true") + else + props.put("dual.commit.enabled", "false") + + val config = new ConsumerConfig(props) + val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false + + val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) + val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt)) + + val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 + + val connector = Consumer.create(config) + + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + connector.shutdown() + // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack + if(!options.has(groupIdOpt)) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt)) + } + }) + + var numMessages = 0L + val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] + formatter.init(formatterArgs) + try { + val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) + val iter = if(maxMessages >= 0) + stream.slice(0, maxMessages) + else + stream + + for(messageAndTopic <- iter) { + try { + formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) + numMessages += 1 + } catch { + case e: Throwable => + if (skipMessageOnError) + error("Error processing message, skipping this message: ", e) + else + throw e + } + if(System.out.checkError()) { + // This means no one is listening to our output stream any more, time to shutdown + System.err.println("Unable to write to standard out, closing consumer.") + System.err.println("Consumed %d messages".format(numMessages)) + formatter.close() + connector.shutdown() + System.exit(1) + } + } + } catch { + case e: Throwable => error("Error processing message, stopping consumer: ", e) + } + System.err.println("Consumed %d messages".format(numMessages)) + System.out.flush() + formatter.close() + connector.shutdown() + } + + def tryParse(parser: OptionParser, args: Array[String]) = { + try { + parser.parse(args : _*) + } catch { + case e: OptionException => { + Utils.croak(e.getMessage) + null + } + } + } + + def checkZkPathExists(zkUrl: String, path: String): Boolean = { + try { + val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer); + zk.exists(path) + } catch { + case _: Throwable => false + } + } +} + +object MessageFormatter { + def tryParseFormatterArgs(args: Iterable[String]): Properties = { + val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0) + if(!splits.forall(_.length == 2)) { + System.err.println("Invalid parser arguments: " + args.mkString(" ")) + System.exit(1) + } + val props = new Properties + for(a <- splits) + props.put(a(0), a(1)) + props + } +} + +trait MessageFormatter { + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) + def init(props: Properties) {} + def close() {} +} + +class DefaultMessageFormatter extends MessageFormatter { + var printKey = false + var keySeparator = "\t".getBytes + var lineSeparator = "\n".getBytes + + override def init(props: Properties) { + if(props.containsKey("print.key")) + printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") + if(props.containsKey("key.separator")) + keySeparator = props.getProperty("key.separator").getBytes + if(props.containsKey("line.separator")) + lineSeparator = props.getProperty("line.separator").getBytes + } + + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { + if(printKey) { + output.write(if (key == null) "null".getBytes() else key) + output.write(keySeparator) + } + output.write(if (value == null) "null".getBytes() else value) + output.write(lineSeparator) + } +} + +class NoOpMessageFormatter extends MessageFormatter { + override def init(props: Properties) {} + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {} +} + +class ChecksumMessageFormatter extends MessageFormatter { + private var topicStr: String = _ + + override def init(props: Properties) { + topicStr = props.getProperty("topic") + if (topicStr != null) + topicStr = topicStr + ":" + else + topicStr = "" + } + + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { + val chksum = new Message(value, key).checksum + output.println(topicStr + "checksum:" + chksum) + } +} diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala new file mode 100644 index 0000000..a66cc82 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicLong +import java.nio.channels.ClosedByInterruptException +import org.apache.log4j.Logger +import kafka.message.Message +import kafka.utils.ZkUtils +import java.util.{ Random, Properties } +import kafka.consumer._ +import java.text.SimpleDateFormat + +/** + * Performance test for the full zookeeper consumer + */ +object ConsumerPerformance { + private val logger = Logger.getLogger("kafka.tools.performance") + + def main(args: Array[String]): Unit = { + + val config = new ConsumerPerfConfig(args) + logger.info("Starting consumer...") + var totalMessagesRead = new AtomicLong(0) + var totalBytesRead = new AtomicLong(0) + + if (!config.hideHeader) { + if (!config.showDetailedStats) + println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + else + println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + } + + // clean up zookeeper state for this group id for every perf run + ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId) + + val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig) + + val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads)) + var threadList = List[ConsumerPerfThread]() + for ((topic, streamList) <- topicMessageStreams) + for (i <- 0 until streamList.length) + threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, + totalMessagesRead, totalBytesRead) + + logger.info("Sleeping for 1 second.") + Thread.sleep(1000) + logger.info("starting threads") + val startMs = System.currentTimeMillis + for (thread <- threadList) + thread.start + + for (thread <- threadList) + thread.join + + val endMs = System.currentTimeMillis + val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0 + if (!config.showDetailedStats) { + val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024) + println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs), + config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, + totalMessagesRead.get / elapsedSecs)) + } + System.exit(0) + } + + class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val groupIdOpt = parser.accepts("group", "The group id to consume on.") + .withRequiredArg + .describedAs("gid") + .defaultsTo("perf-consumer-" + new Random().nextInt(100000)) + .ofType(classOf[String]) + val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024 * 1024) + val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + + "offset to consume from, start with the latest message present in the log rather than the earliest message.") + val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(2 * 1024 * 1024) + val numThreadsOpt = parser.accepts("threads", "Number of processing threads.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(10) + val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + + val options = parser.parse(args: _*) + + for (arg <- List(topicOpt, zkConnectOpt)) { + if (!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + val props = new Properties + props.put("group.id", options.valueOf(groupIdOpt)) + props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) + props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) + props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest") + props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) + props.put("consumer.timeout.ms", "5000") + props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString) + val consumerConfig = new ConsumerConfig(props) + val numThreads = options.valueOf(numThreadsOpt).intValue + val topic = options.valueOf(topicOpt) + val numMessages = options.valueOf(numMessagesOpt).longValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val showDetailedStats = options.has(showDetailedStatsOpt) + val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) + val hideHeader = options.has(hideHeaderOpt) + } + + class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]], + config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) + extends Thread(name) { + + override def run() { + var bytesRead = 0L + var messagesRead = 0L + val startMs = System.currentTimeMillis + var lastReportTime: Long = startMs + var lastBytesRead = 0L + var lastMessagesRead = 0L + + try { + val iter = stream.iterator + while (iter.hasNext && messagesRead < config.numMessages) { + val messageAndMetadata = iter.next + messagesRead += 1 + bytesRead += messageAndMetadata.message.length + + if (messagesRead % config.reportingInterval == 0) { + if (config.showDetailedStats) + printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis) + lastReportTime = System.currentTimeMillis + lastMessagesRead = messagesRead + lastBytesRead = bytesRead + } + } + } catch { + case _: InterruptedException => + case _: ClosedByInterruptException => + case _: ConsumerTimeoutException => + case e: Throwable => e.printStackTrace() + } + totalMessagesRead.addAndGet(messagesRead) + totalBytesRead.addAndGet(bytesRead) + if (config.showDetailedStats) + printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis) + } + + private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long, + startMs: Long, endMs: Long) = { + val elapsedMs = endMs - startMs + val totalMBRead = (bytesRead * 1.0) / (1024 * 1024) + val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024) + println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id, + config.consumerConfig.fetchMessageMaxBytes, totalMBRead, + 1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0)) + } + } + +} diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala new file mode 100644 index 0000000..129cc01 --- /dev/null +++ b/core/src/main/scala/kafka/tools/PerfConfig.scala @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.tools + +import joptsimple.OptionParser + + +class PerfConfig(args: Array[String]) { + val parser = new OptionParser + val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume") + .withRequiredArg + .describedAs("count") + .ofType(classOf[java.lang.Long]) + .defaultsTo(Long.MaxValue) + val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(5000) + val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + + "See java.text.SimpleDateFormat for options.") + .withRequiredArg + .describedAs("date format") + .ofType(classOf[String]) + .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS") + val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + + "interval as configured by reporting-interval") + val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") + val messageSizeOpt = parser.accepts("message-size", "The size of each message.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(200) + val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") + .withRequiredArg + .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val helpOpt = parser.accepts("help", "Print usage.") +} diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala new file mode 100644 index 0000000..240efc4 --- /dev/null +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import kafka.metrics.KafkaMetricsReporter +import kafka.producer.{OldProducer, NewShinyProducer} +import kafka.utils.{VerifiableProperties, Logging} +import kafka.message.CompressionCodec +import kafka.serializer._ + +import java.util.concurrent.{CountDownLatch, Executors} +import java.util.concurrent.atomic.AtomicLong +import java.util._ +import java.text.SimpleDateFormat +import java.math.BigInteger +import scala.collection.immutable.List + +import org.apache.log4j.Logger + +/** + * Load test for the producer + */ +object ProducerPerformance extends Logging { + + def main(args: Array[String]) { + + val logger = Logger.getLogger("kafka.tools.peformance") + val config = new ProducerPerfConfig(args) + if (!config.isFixedSize) + logger.info("WARN: Throughput will be slower due to changing message size per request") + + val totalBytesSent = new AtomicLong(0) + val totalMessagesSent = new AtomicLong(0) + val executor = Executors.newFixedThreadPool(config.numThreads) + val allDone = new CountDownLatch(config.numThreads) + val startMs = System.currentTimeMillis + val rand = new java.util.Random + + if (!config.hideHeader) + println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " + + "total.data.sent.in.nMsg, nMsg.sec") + + for (i <- 0 until config.numThreads) { + executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand)) + } + + allDone.await() + val endMs = System.currentTimeMillis + val elapsedSecs = (endMs - startMs) / 1000.0 + val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024) + println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format( + config.dateFormat.format(startMs), config.dateFormat.format(endMs), + config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent, + totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs)) + System.exit(0) + } + + class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) { + val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.") + .withRequiredArg + .describedAs("hostname:port,..,hostname:port") + .ofType(classOf[String]) + val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to") + .withRequiredArg + .describedAs("topic1,topic2..") + .ofType(classOf[String]) + val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3000) + val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(3) + val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) + val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + + "to complete") + .withRequiredArg() + .ofType(classOf[java.lang.Integer]) + .defaultsTo(-1) + val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") + val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.") + val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") + .withRequiredArg + .describedAs("number of threads") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1) + val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " + + "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + + "in the form of 'Message:000...1:xxx...'") + .withRequiredArg() + .describedAs("initial message id") + .ofType(classOf[java.lang.Integer]) + val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends") + .withRequiredArg() + .describedAs("message send time gap") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") + val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + + "set, the csv metrics will be outputed here") + .withRequiredArg + .describedAs("metrics dictory") + .ofType(classOf[java.lang.String]) + val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") + + val options = parser.parse(args: _*) + for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) { + if (!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + val topicsStr = options.valueOf(topicsOpt) + val topics = topicsStr.split(",") + val numMessages = options.valueOf(numMessagesOpt).longValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) + val hideHeader = options.has(hideHeaderOpt) + val brokerList = options.valueOf(brokerListOpt) + val messageSize = options.valueOf(messageSizeOpt).intValue + var isFixedSize = !options.has(varyMessageSizeOpt) + var isSync = options.has(syncOpt) + var batchSize = options.valueOf(batchSizeOpt).intValue + var numThreads = options.valueOf(numThreadsOpt).intValue + val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) + val seqIdMode = options.has(initialMessageIdOpt) + var initialMessageId: Int = 0 + if (seqIdMode) + initialMessageId = options.valueOf(initialMessageIdOpt).intValue() + val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() + val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue() + val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue() + val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue() + val useNewProducer = options.has(useNewProducerOpt) + + val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) + + if (csvMetricsReporterEnabled) { + val props = new Properties() + props.put("kafka.metrics.polling.interval.secs", "1") + props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") + if (options.has(metricsDirectoryOpt)) + props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) + else + props.put("kafka.csv.metrics.dir", "kafka_metrics") + props.put("kafka.csv.metrics.reporter.enabled", "true") + val verifiableProps = new VerifiableProperties(props) + KafkaMetricsReporter.startReporters(verifiableProps) + } + + val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue() + } + + class ProducerThread(val threadId: Int, + val config: ProducerPerfConfig, + val totalBytesSent: AtomicLong, + val totalMessagesSent: AtomicLong, + val allDone: CountDownLatch, + val rand: Random) extends Runnable { + val seqIdNumDigit = 10 // no. of digits for max int value + + val messagesPerThread = config.numMessages / config.numThreads + debug("Messages per thread = " + messagesPerThread) + val props = new Properties() + val producer = + if (config.useNewProducer) { + import org.apache.kafka.clients.producer.ProducerConfig + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) + props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance") + props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) + props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) + props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) + props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) + new NewShinyProducer(props) + } else { + props.put("metadata.broker.list", config.brokerList) + props.put("compression.codec", config.compressionCodec.codec.toString) + props.put("send.buffer.bytes", (64 * 1024).toString) + if (!config.isSync) { + props.put("producer.type", "async") + props.put("batch.num.messages", config.batchSize.toString) + props.put("queue.enqueue.timeout.ms", "-1") + } + props.put("client.id", "producer-performance") + props.put("request.required.acks", config.producerRequestRequiredAcks.toString) + props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) + props.put("message.send.max.retries", config.producerNumRetries.toString) + props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) + props.put("serializer.class", classOf[DefaultEncoder].getName) + props.put("key.serializer.class", classOf[NullEncoder[Long]].getName) + new OldProducer(props) + } + + // generate the sequential message ID + private val SEP = ":" // message field separator + private val messageIdLabel = "MessageID" + private val threadIdLabel = "ThreadID" + private val topicLabel = "Topic" + private var leftPaddedSeqId: String = "" + + private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = { + // Each thread gets a unique range of sequential no. for its ids. + // Eg. 1000 msg in 10 threads => 100 msg per thread + // thread 0 IDs : 0 ~ 99 + // thread 1 IDs : 100 ~ 199 + // thread 2 IDs : 200 ~ 299 + // . . . + leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId)) + + val msgHeader = topicLabel + SEP + + topic + SEP + + threadIdLabel + SEP + + threadId + SEP + + messageIdLabel + SEP + + leftPaddedSeqId + SEP + + val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x') + debug(seqMsgString) + return seqMsgString.getBytes() + } + + private def generateProducerData(topic: String, messageId: Long): Array[Byte] = { + val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize) + if (config.seqIdMode) { + val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId + generateMessageWithSeqId(topic, seqId, msgSize) + } else { + new Array[Byte](msgSize) + } + } + + override def run { + var bytesSent = 0L + var nSends = 0 + var i: Long = 0L + var message: Array[Byte] = null + + while (i < messagesPerThread) { + try { + config.topics.foreach( + topic => { + message = generateProducerData(topic, i) + producer.send(topic, BigInteger.valueOf(i).toByteArray, message) + bytesSent += message.size + nSends += 1 + if (config.messageSendGapMs > 0) + Thread.sleep(config.messageSendGapMs) + }) + } catch { + case e: Throwable => error("Error when sending message " + new String(message), e) + } + i += 1 + } + try { + producer.close() + } catch { + case e: Throwable => error("Error when closing producer", e) + } + totalBytesSent.addAndGet(bytesSent) + totalMessagesSent.addAndGet(nSends) + allDone.countDown() + } + } +} diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala new file mode 100644 index 0000000..6fb5bb0 --- /dev/null +++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import java.net.URI +import java.text.SimpleDateFormat +import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} +import kafka.consumer.SimpleConsumer +import kafka.utils._ +import org.apache.log4j.Logger +import kafka.common.TopicAndPartition + + +/** + * Performance test for the simple consumer + */ +object SimpleConsumerPerformance { + + def main(args: Array[String]) { + val logger = Logger.getLogger("kafka.tools.performance") + val config = new ConsumerPerfConfig(args) + + if(!config.hideHeader) { + if(!config.showDetailedStats) + println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + else + println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") + } + + val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId) + + // reset to latest or smallest offset + val topicAndPartition = TopicAndPartition(config.topic, config.partition) + val request = OffsetRequest(Map( + topicAndPartition -> PartitionOffsetRequestInfo(if (config.fromLatest) OffsetRequest.LatestTime else OffsetRequest.EarliestTime, 1) + )) + var offset: Long = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + + val startMs = System.currentTimeMillis + var done = false + var totalBytesRead = 0L + var totalMessagesRead = 0L + var consumedInterval = 0 + var lastReportTime: Long = startMs + var lastBytesRead = 0L + var lastMessagesRead = 0L + while(!done) { + // TODO: add in the maxWait and minBytes for performance + val request = new FetchRequestBuilder() + .clientId(config.clientId) + .addFetch(config.topic, config.partition, offset, config.fetchSize) + .build() + val fetchResponse = consumer.fetch(request) + + var messagesRead = 0 + var bytesRead = 0 + val messageSet = fetchResponse.messageSet(config.topic, config.partition) + for (message <- messageSet) { + messagesRead += 1 + bytesRead += message.message.payloadSize + } + + if(messagesRead == 0 || totalMessagesRead > config.numMessages) + done = true + else + // we only did one fetch so we find the offset for the first (head) messageset + offset += messageSet.validBytes + + totalBytesRead += bytesRead + totalMessagesRead += messagesRead + consumedInterval += messagesRead + + if(consumedInterval > config.reportingInterval) { + if(config.showDetailedStats) { + val reportTime = System.currentTimeMillis + val elapsed = (reportTime - lastReportTime)/1000.0 + val totalMBRead = ((totalBytesRead-lastBytesRead)*1.0)/(1024*1024) + println(("%s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(reportTime), config.fetchSize, + (totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed, + totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed)) + } + lastReportTime = SystemTime.milliseconds + lastBytesRead = totalBytesRead + lastMessagesRead = totalMessagesRead + consumedInterval = 0 + } + } + val reportTime = System.currentTimeMillis + val elapsed = (reportTime - startMs) / 1000.0 + + if(!config.showDetailedStats) { + val totalMBRead = (totalBytesRead*1.0)/(1024*1024) + println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), + config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed, + totalMessagesRead, totalMessagesRead/elapsed)) + } + System.exit(0) + } + + class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { + val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") + .withRequiredArg + .describedAs("kafka://hostname:port") + .ofType(classOf[String]) + val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + + "offset to consume from, start with the latest message present in the log rather than the earliest message.") + val partitionOpt = parser.accepts("partition", "The topic partition to consume from.") + .withRequiredArg + .describedAs("partition") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(0) + val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.") + .withRequiredArg + .describedAs("bytes") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024*1024) + val clientIdOpt = parser.accepts("clientId", "The ID of this client.") + .withRequiredArg + .describedAs("clientId") + .ofType(classOf[String]) + .defaultsTo("SimpleConsumerPerformanceClient") + + val options = parser.parse(args : _*) + + for(arg <- List(topicOpt, urlOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + val url = new URI(options.valueOf(urlOpt)) + val fetchSize = options.valueOf(fetchSizeOpt).intValue + val fromLatest = options.has(resetBeginningOffsetOpt) + val partition = options.valueOf(partitionOpt).intValue + val topic = options.valueOf(topicOpt) + val numMessages = options.valueOf(numMessagesOpt).longValue + val reportingInterval = options.valueOf(reportingIntervalOpt).intValue + val showDetailedStats = options.has(showDetailedStatsOpt) + val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) + val hideHeader = options.has(hideHeaderOpt) + val clientId = options.valueOf(clientIdOpt).toString + } +} diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala index 6f0addc..ed22931 100644 --- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala @@ -30,10 +30,6 @@ class MessageCompressionTest extends JUnitSuite { val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec) if(isSnappyAvailable) codecs += SnappyCompressionCodec - if(isLZ4Available) - codecs += LZ4CompressionCodec - if (izLZ4HCAvailable) - codecs += LZ4HCCompressionCodec for(codec <- codecs) testSimpleCompressDecompress(codec) } @@ -65,23 +61,4 @@ class MessageCompressionTest extends JUnitSuite { case e: org.xerial.snappy.SnappyError => false } } - - def isLZ4Available(): Boolean = { - try { - val lz4 = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream()) - true - } catch { - case e: UnsatisfiedLinkError => false - } - } - - def izLZ4HCAvailable(): Boolean = { - try { - val lz4hc = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream(), 1 << 16, - net.jpountz.lz4.LZ4Factory.fastestInstance().highCompressor()) - true - } catch { - case e: UnsatisfiedLinkError => false - } - } } diff --git a/perf/config/log4j.properties b/perf/config/log4j.properties deleted file mode 100644 index 542b739..0000000 --- a/perf/config/log4j.properties +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -log4j.rootLogger=INFO, fileAppender - -log4j.appender.fileAppender=org.apache.log4j.FileAppender -log4j.appender.fileAppender.File=perf.log -log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout -log4j.appender.fileAppender.layout.ConversionPattern=%m %n - -# Turn on all our debugging info -log4j.logger.kafka=INFO - diff --git a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala deleted file mode 100644 index 4dde468..0000000 --- a/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.perf - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicLong -import java.nio.channels.ClosedByInterruptException -import org.apache.log4j.Logger -import kafka.message.Message -import kafka.utils.ZkUtils -import java.util.{ Random, Properties } -import kafka.consumer._ -import java.text.SimpleDateFormat - -/** - * Performance test for the full zookeeper consumer - */ -object ConsumerPerformance { - private val logger = Logger.getLogger(getClass()) - - def main(args: Array[String]): Unit = { - - val config = new ConsumerPerfConfig(args) - logger.info("Starting consumer...") - var totalMessagesRead = new AtomicLong(0) - var totalBytesRead = new AtomicLong(0) - - if (!config.hideHeader) { - if (!config.showDetailedStats) - println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - else - println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - } - - // clean up zookeeper state for this group id for every perf run - ZkUtils.maybeDeletePath(config.consumerConfig.zkConnect, "/consumers/" + config.consumerConfig.groupId) - - val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig) - - val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads)) - var threadList = List[ConsumerPerfThread]() - for ((topic, streamList) <- topicMessageStreams) - for (i <- 0 until streamList.length) - threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, - totalMessagesRead, totalBytesRead) - - logger.info("Sleeping for 1 second.") - Thread.sleep(1000) - logger.info("starting threads") - val startMs = System.currentTimeMillis - for (thread <- threadList) - thread.start - - for (thread <- threadList) - thread.join - - val endMs = System.currentTimeMillis - val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0 - if (!config.showDetailedStats) { - val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024) - println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs), - config.consumerConfig.fetchMessageMaxBytes, totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, - totalMessagesRead.get / elapsedSecs)) - } - System.exit(0) - } - - class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val groupIdOpt = parser.accepts("group", "The group id to consume on.") - .withRequiredArg - .describedAs("gid") - .defaultsTo("perf-consumer-" + new Random().nextInt(100000)) - .ofType(classOf[String]) - val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024 * 1024) - val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + - "offset to consume from, start with the latest message present in the log rather than the earliest message.") - val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(2 * 1024 * 1024) - val numThreadsOpt = parser.accepts("threads", "Number of processing threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(10) - val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - - val options = parser.parse(args: _*) - - for (arg <- List(topicOpt, zkConnectOpt)) { - if (!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - - val props = new Properties - props.put("group.id", options.valueOf(groupIdOpt)) - props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString) - props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString) - props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest") - props.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - props.put("consumer.timeout.ms", "5000") - props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString) - val consumerConfig = new ConsumerConfig(props) - val numThreads = options.valueOf(numThreadsOpt).intValue - val topic = options.valueOf(topicOpt) - val numMessages = options.valueOf(numMessagesOpt).longValue - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - val showDetailedStats = options.has(showDetailedStatsOpt) - val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) - val hideHeader = options.has(hideHeaderOpt) - } - - class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]], - config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) - extends Thread(name) { - - override def run() { - var bytesRead = 0L - var messagesRead = 0L - val startMs = System.currentTimeMillis - var lastReportTime: Long = startMs - var lastBytesRead = 0L - var lastMessagesRead = 0L - - try { - val iter = stream.iterator - while (iter.hasNext && messagesRead < config.numMessages) { - val messageAndMetadata = iter.next - messagesRead += 1 - bytesRead += messageAndMetadata.message.length - - if (messagesRead % config.reportingInterval == 0) { - if (config.showDetailedStats) - printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis) - lastReportTime = System.currentTimeMillis - lastMessagesRead = messagesRead - lastBytesRead = bytesRead - } - } - } catch { - case _: InterruptedException => - case _: ClosedByInterruptException => - case _: ConsumerTimeoutException => - case e: Throwable => e.printStackTrace() - } - totalMessagesRead.addAndGet(messagesRead) - totalBytesRead.addAndGet(bytesRead) - if (config.showDetailedStats) - printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis) - } - - private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long, - startMs: Long, endMs: Long) = { - val elapsedMs = endMs - startMs - val totalMBRead = (bytesRead * 1.0) / (1024 * 1024) - val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024) - println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id, - config.consumerConfig.fetchMessageMaxBytes, totalMBRead, - 1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0)) - } - } - -} diff --git a/perf/src/main/scala/kafka/perf/PerfConfig.scala b/perf/src/main/scala/kafka/perf/PerfConfig.scala deleted file mode 100644 index a8fc6b9..0000000 --- a/perf/src/main/scala/kafka/perf/PerfConfig.scala +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.perf - -import joptsimple.OptionParser - - -class PerfConfig(args: Array[String]) { - val parser = new OptionParser - val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume") - .withRequiredArg - .describedAs("count") - .ofType(classOf[java.lang.Long]) - .defaultsTo(Long.MaxValue) - val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(5000) - val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + - "See java.text.SimpleDateFormat for options.") - .withRequiredArg - .describedAs("date format") - .ofType(classOf[String]) - .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS") - val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + - "interval as configured by reporting-interval") - val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") - val messageSizeOpt = parser.accepts("message-size", "The size of each message.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) - val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.") - .withRequiredArg - .describedAs("size") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(200) - val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") - .withRequiredArg - .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - val helpOpt = parser.accepts("help", "Print usage.") -} diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala deleted file mode 100644 index 00fa90b..0000000 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.perf - -import kafka.metrics.KafkaMetricsReporter -import kafka.producer.{OldProducer, NewShinyProducer} -import kafka.utils.{VerifiableProperties, Logging} -import kafka.message.CompressionCodec -import kafka.serializer._ - -import java.util.concurrent.{CountDownLatch, Executors} -import java.util.concurrent.atomic.AtomicLong -import java.util._ -import java.text.SimpleDateFormat -import java.math.BigInteger -import scala.collection.immutable.List - -import org.apache.log4j.Logger - -/** - * Load test for the producer - */ -object ProducerPerformance extends Logging { - - def main(args: Array[String]) { - - val logger = Logger.getLogger(getClass) - val config = new ProducerPerfConfig(args) - if (!config.isFixedSize) - logger.info("WARN: Throughput will be slower due to changing message size per request") - - val totalBytesSent = new AtomicLong(0) - val totalMessagesSent = new AtomicLong(0) - val executor = Executors.newFixedThreadPool(config.numThreads) - val allDone = new CountDownLatch(config.numThreads) - val startMs = System.currentTimeMillis - val rand = new java.util.Random - - if (!config.hideHeader) - println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " + - "total.data.sent.in.nMsg, nMsg.sec") - - for (i <- 0 until config.numThreads) { - executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand)) - } - - allDone.await() - val endMs = System.currentTimeMillis - val elapsedSecs = (endMs - startMs) / 1000.0 - val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024) - println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format( - config.dateFormat.format(startMs), config.dateFormat.format(endMs), - config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent, - totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs)) - System.exit(0) - } - - class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.") - .withRequiredArg - .describedAs("hostname:port,..,hostname:port") - .ofType(classOf[String]) - val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to") - .withRequiredArg - .describedAs("topic1,topic2..") - .ofType(classOf[String]) - val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3000) - val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(3) - val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(100) - val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " + - "to complete") - .withRequiredArg() - .ofType(classOf[java.lang.Integer]) - .defaultsTo(-1) - val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.") - val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.") - val numThreadsOpt = parser.accepts("threads", "Number of sending threads.") - .withRequiredArg - .describedAs("number of threads") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1) - val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " + - "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + - "in the form of 'Message:000...1:xxx...'") - .withRequiredArg() - .describedAs("initial message id") - .ofType(classOf[java.lang.Integer]) - val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends") - .withRequiredArg() - .describedAs("message send time gap") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") - val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") - .withRequiredArg - .describedAs("metrics dictory") - .ofType(classOf[java.lang.String]) - val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") - - val options = parser.parse(args: _*) - for (arg <- List(topicsOpt, brokerListOpt, numMessagesOpt)) { - if (!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - val topicsStr = options.valueOf(topicsOpt) - val topics = topicsStr.split(",") - val numMessages = options.valueOf(numMessagesOpt).longValue - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) - val hideHeader = options.has(hideHeaderOpt) - val brokerList = options.valueOf(brokerListOpt) - val messageSize = options.valueOf(messageSizeOpt).intValue - var isFixedSize = !options.has(varyMessageSizeOpt) - var isSync = options.has(syncOpt) - var batchSize = options.valueOf(batchSizeOpt).intValue - var numThreads = options.valueOf(numThreadsOpt).intValue - val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue) - val seqIdMode = options.has(initialMessageIdOpt) - var initialMessageId: Int = 0 - if (seqIdMode) - initialMessageId = options.valueOf(initialMessageIdOpt).intValue() - val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue() - val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue() - val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue() - val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue() - val useNewProducer = options.has(useNewProducerOpt) - - val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) - - if (csvMetricsReporterEnabled) { - val props = new Properties() - props.put("kafka.metrics.polling.interval.secs", "1") - props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") - if (options.has(metricsDirectoryOpt)) - props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt)) - else - props.put("kafka.csv.metrics.dir", "kafka_metrics") - props.put("kafka.csv.metrics.reporter.enabled", "true") - val verifiableProps = new VerifiableProperties(props) - KafkaMetricsReporter.startReporters(verifiableProps) - } - - val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue() - } - - class ProducerThread(val threadId: Int, - val config: ProducerPerfConfig, - val totalBytesSent: AtomicLong, - val totalMessagesSent: AtomicLong, - val allDone: CountDownLatch, - val rand: Random) extends Runnable { - val seqIdNumDigit = 10 // no. of digits for max int value - - val messagesPerThread = config.numMessages / config.numThreads - debug("Messages per thread = " + messagesPerThread) - val props = new Properties() - val producer = - if (config.useNewProducer) { - import org.apache.kafka.clients.producer.ProducerConfig - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) - props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) - props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance") - props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString) - props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString) - props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) - props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) - new NewShinyProducer(props) - } else { - props.put("metadata.broker.list", config.brokerList) - props.put("compression.codec", config.compressionCodec.codec.toString) - props.put("send.buffer.bytes", (64 * 1024).toString) - if (!config.isSync) { - props.put("producer.type", "async") - props.put("batch.num.messages", config.batchSize.toString) - props.put("queue.enqueue.timeout.ms", "-1") - } - props.put("client.id", "producer-performance") - props.put("request.required.acks", config.producerRequestRequiredAcks.toString) - props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString) - props.put("message.send.max.retries", config.producerNumRetries.toString) - props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString) - props.put("serializer.class", classOf[DefaultEncoder].getName) - props.put("key.serializer.class", classOf[NullEncoder[Long]].getName) - new OldProducer(props) - } - - // generate the sequential message ID - private val SEP = ":" // message field separator - private val messageIdLabel = "MessageID" - private val threadIdLabel = "ThreadID" - private val topicLabel = "Topic" - private var leftPaddedSeqId: String = "" - - private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = { - // Each thread gets a unique range of sequential no. for its ids. - // Eg. 1000 msg in 10 threads => 100 msg per thread - // thread 0 IDs : 0 ~ 99 - // thread 1 IDs : 100 ~ 199 - // thread 2 IDs : 200 ~ 299 - // . . . - leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId)) - - val msgHeader = topicLabel + SEP + - topic + SEP + - threadIdLabel + SEP + - threadId + SEP + - messageIdLabel + SEP + - leftPaddedSeqId + SEP - - val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x') - debug(seqMsgString) - return seqMsgString.getBytes() - } - - private def generateProducerData(topic: String, messageId: Long): Array[Byte] = { - val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize) - if (config.seqIdMode) { - val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId - generateMessageWithSeqId(topic, seqId, msgSize) - } else { - new Array[Byte](msgSize) - } - } - - override def run { - var bytesSent = 0L - var nSends = 0 - var i: Long = 0L - var message: Array[Byte] = null - - while (i < messagesPerThread) { - try { - config.topics.foreach( - topic => { - message = generateProducerData(topic, i) - producer.send(topic, BigInteger.valueOf(i).toByteArray, message) - bytesSent += message.size - nSends += 1 - if (config.messageSendGapMs > 0) - Thread.sleep(config.messageSendGapMs) - }) - } catch { - case e: Throwable => error("Error when sending message " + new String(message), e) - } - i += 1 - } - try { - producer.close() - } catch { - case e: Throwable => error("Error when closing producer", e) - } - totalBytesSent.addAndGet(bytesSent) - totalMessagesSent.addAndGet(nSends) - allDone.countDown() - } - } -} diff --git a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala b/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala deleted file mode 100644 index c52ada0..0000000 --- a/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.perf - -import java.net.URI -import java.text.SimpleDateFormat -import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest} -import kafka.consumer.SimpleConsumer -import kafka.utils._ -import org.apache.log4j.Logger -import kafka.common.TopicAndPartition - - -/** - * Performance test for the simple consumer - */ -object SimpleConsumerPerformance { - - def main(args: Array[String]) { - val logger = Logger.getLogger(getClass) - val config = new ConsumerPerfConfig(args) - - if(!config.hideHeader) { - if(!config.showDetailedStats) - println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - else - println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec") - } - - val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize, config.clientId) - - // reset to latest or smallest offset - val topicAndPartition = TopicAndPartition(config.topic, config.partition) - val request = OffsetRequest(Map( - topicAndPartition -> PartitionOffsetRequestInfo(if (config.fromLatest) OffsetRequest.LatestTime else OffsetRequest.EarliestTime, 1) - )) - var offset: Long = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head - - val startMs = System.currentTimeMillis - var done = false - var totalBytesRead = 0L - var totalMessagesRead = 0L - var consumedInterval = 0 - var lastReportTime: Long = startMs - var lastBytesRead = 0L - var lastMessagesRead = 0L - while(!done) { - // TODO: add in the maxWait and minBytes for performance - val request = new FetchRequestBuilder() - .clientId(config.clientId) - .addFetch(config.topic, config.partition, offset, config.fetchSize) - .build() - val fetchResponse = consumer.fetch(request) - - var messagesRead = 0 - var bytesRead = 0 - val messageSet = fetchResponse.messageSet(config.topic, config.partition) - for (message <- messageSet) { - messagesRead += 1 - bytesRead += message.message.payloadSize - } - - if(messagesRead == 0 || totalMessagesRead > config.numMessages) - done = true - else - // we only did one fetch so we find the offset for the first (head) messageset - offset += messageSet.validBytes - - totalBytesRead += bytesRead - totalMessagesRead += messagesRead - consumedInterval += messagesRead - - if(consumedInterval > config.reportingInterval) { - if(config.showDetailedStats) { - val reportTime = System.currentTimeMillis - val elapsed = (reportTime - lastReportTime)/1000.0 - val totalMBRead = ((totalBytesRead-lastBytesRead)*1.0)/(1024*1024) - println(("%s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(reportTime), config.fetchSize, - (totalBytesRead*1.0)/(1024*1024), totalMBRead/elapsed, - totalMessagesRead, (totalMessagesRead-lastMessagesRead)/elapsed)) - } - lastReportTime = SystemTime.milliseconds - lastBytesRead = totalBytesRead - lastMessagesRead = totalMessagesRead - consumedInterval = 0 - } - } - val reportTime = System.currentTimeMillis - val elapsed = (reportTime - startMs) / 1000.0 - - if(!config.showDetailedStats) { - val totalMBRead = (totalBytesRead*1.0)/(1024*1024) - println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), - config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed, - totalMessagesRead, totalMessagesRead/elapsed)) - } - System.exit(0) - } - - class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") - .withRequiredArg - .describedAs("kafka://hostname:port") - .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " + - "offset to consume from, start with the latest message present in the log rather than the earliest message.") - val partitionOpt = parser.accepts("partition", "The topic partition to consume from.") - .withRequiredArg - .describedAs("partition") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(0) - val fetchSizeOpt = parser.accepts("fetch-size", "REQUIRED: The fetch size to use for consumption.") - .withRequiredArg - .describedAs("bytes") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(1024*1024) - val clientIdOpt = parser.accepts("clientId", "The ID of this client.") - .withRequiredArg - .describedAs("clientId") - .ofType(classOf[String]) - .defaultsTo("SimpleConsumerPerformanceClient") - - val options = parser.parse(args : _*) - - for(arg <- List(topicOpt, urlOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } - val url = new URI(options.valueOf(urlOpt)) - val fetchSize = options.valueOf(fetchSizeOpt).intValue - val fromLatest = options.has(resetBeginningOffsetOpt) - val partition = options.valueOf(partitionOpt).intValue - val topic = options.valueOf(topicOpt) - val numMessages = options.valueOf(numMessagesOpt).longValue - val reportingInterval = options.valueOf(reportingIntervalOpt).intValue - val showDetailedStats = options.has(showDetailedStatsOpt) - val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) - val hideHeader = options.has(hideHeaderOpt) - val clientId = options.valueOf(clientIdOpt).toString - } -} diff --git a/system_test/producer_perf/config/server.properties b/system_test/producer_perf/config/server.properties index 83a1e06..9f8a633 100644 --- a/system_test/producer_perf/config/server.properties +++ b/system_test/producer_perf/config/server.properties @@ -60,10 +60,10 @@ enable.zookeeper=true # zk connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" -zookeeper.connect=localhost:2181 +zk.connect=localhost:2181 # timeout in ms for connecting to zookeeper -zookeeper.connection.timeout.ms=1000000 +zk.connection.timeout.ms=1000000 # time based topic flush intervals in ms #log.flush.intervals.ms.per.topic=topic:1000