From 0f0517a3546779ac1601d6ba12166ae4937b8e16 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 12 Mar 2015 08:50:27 -0700 Subject: [PATCH 1/5] merge 1892 --- .../java/org/apache/kafka/clients/consumer/internals/Coordinator.java | 1 - .../java/org/apache/kafka/clients/consumer/internals/Fetcher.java | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index 436f9b2..1bdcb05 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -212,7 +212,6 @@ public final class Coordinator { // parse the response to get the offsets boolean offsetsReady = true; OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody()); - // TODO: needs to handle disconnects Map offsets = new HashMap(response.responseData().size()); for (Map.Entry entry : response.responseData().entrySet()) { TopicPartition tp = entry.getKey(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 8b71fba..95c28fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -236,8 +236,8 @@ public class Fetcher { topicPartition); awaitMetadataUpdate(); } else { - // TODO: we should not just throw exceptions but should handle and log it. - Errors.forCode(errorCode).maybeThrow(); + throw new IllegalStateException("Unexpected error code " + errorCode + + " while fetching offsets for partition " + topicPartition + "."); } } } else { -- 1.7.12.4 From da9fb8da46362183aa40441826991bf6d27f80d0 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 12 Mar 2015 10:48:47 -0700 Subject: [PATCH 2/5] dummy --- .../main/scala/kafka/tools/ConsoleConsumer.scala | 285 +++++++++++++-------- .../main/scala/kafka/tools/ConsoleProducer.scala | 4 +- 2 files changed, 176 insertions(+), 113 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 910691e..ef97cbf 100644 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -27,7 +27,9 @@ import kafka.message._ import kafka.serializer._ import kafka.utils._ import kafka.metrics.KafkaMetricsReporter -import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer} +import kafka.consumer.{OldConsumer, NewShinyConsumer, Blacklist, Whitelist} + +import org.apache.kafka.clients.consumer.ConsumerConfig /** * Consumer that dumps messages out to standard out. @@ -36,63 +38,193 @@ import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer} object ConsoleConsumer extends Logging { def main(args: Array[String]) { + val config = new ConsumerConfig(args) + + val consumer = + if(config.useNewConsumer) { + new NewShinyConsumer(config.topicIdOpt, getNewConsumerProps(config)) + } else { + new OldConsumer(config.filterSpec, getOldConsumerProps(config)) + } + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + consumer.close() + } + }) + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + connector.shutdown() + // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack + if(!config.groupIdPassed) + ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id")) + } + }) + + var numMessages = 0L + val formatter: MessageFormatter = config.messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] + formatter.init(config.formatterArgs) + + try { + + + var message: KeyedMessage[Array[Byte], Array[Byte]] = null + do { + message = reader.readMessage() + if(message != null) + producer.send(message.topic, message.key, message.message) + } while(message != null) + } catch { + case e: Exception => + e.printStackTrace + System.exit(1) + } + System.exit(0) + } + + + try { + val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) + val iter = if(maxMessages >= 0) + stream.slice(0, config.maxMessages) + else + stream + + for(messageAndTopic <- iter) { + try { + formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) + numMessages += 1 + } catch { + case e: Throwable => + if (skipMessageOnError) + error("Error processing message, skipping this message: ", e) + else + throw e + } + if(System.out.checkError()) { + // This means no one is listening to our output stream any more, time to shutdown + System.err.println("Unable to write to standard out, closing consumer.") + System.err.println("Consumed %d messages".format(numMessages)) + formatter.close() + connector.shutdown() + System.exit(1) + } + } + } catch { + case e: Throwable => error("Error processing message, stopping consumer: ", e) + } + System.err.println("Consumed %d messages".format(numMessages)) + System.out.flush() + formatter.close() + connector.shutdown() + + def getOldConsumerProps(config: ConsumerConfig): Properties = { + val props = new Properties + + props.putAll(config.consumerProps) + + if(!props.containsKey("group.id")) { + props.put("group.id", "console-consumer-" + new Random().nextInt(100000)) + config.groupIdPassed = false + } + props.put("auto.offset.reset", if(options.has(config.resetBeginningOpt)) "smallest" else "largest") + props.put("zookeeper.connect", options.valueOf(config.zkConnectOpt)) + + if (!options.has(config.deleteConsumerOffsetsOpt) && options.has(config.resetBeginningOpt) && + checkZkPathExists(options.valueOf(config.zkConnectOpt),"/consumers/" + props.getProperty("group.id")+ "/offsets")) { + System.err.println("Found previous offset information for this group "+ props.getProperty("group.id") + +". Please use --delete-consumer-offsets to delete previous offsets metadata") + System.exit(1) + } + + if(options.has(config.deleteConsumerOffsetsOpt)) + ZkUtils.maybeDeletePath(options.valueOf(config.zkConnectOpt), "/consumers/" + props.getProperty("group.id")) + + props + } + + def getNewConsumerProps(config: ConsumerConfig): Properties = { + val props = new Properties + + props.putAll(config.consumerProps) + + if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + props.put(ConsumerConfig.GROUP_ID_CONFIG, "console-consumer-" + new Random().nextInt(100000)) + config.groupIdPassed = false + } + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if(options.has(resetBeginningOpt)) "earliest" else "latest") + + props + } + + class ConsumerConfig(args: Array[String]) { val parser = new OptionParser val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") - .withRequiredArg - .describedAs("whitelist") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("whitelist") + .ofType(classOf[String]) val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.") - .withRequiredArg - .describedAs("blacklist") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("blacklist") + .ofType(classOf[String]) val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") - .withRequiredArg - .describedAs("config file") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("config file") + .ofType(classOf[String]) val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") - .withRequiredArg - .describedAs("class") - .ofType(classOf[String]) - .defaultsTo(classOf[DefaultMessageFormatter].getName) + .withRequiredArg + .describedAs("class") + .ofType(classOf[String]) + .defaultsTo(classOf[DefaultMessageFormatter].getName) val messageFormatterArgOpt = parser.accepts("property") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("prop") + .ofType(classOf[String]) val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up"); val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + - "start with the earliest message present in the log rather than the latest message.") + "start with the earliest message present in the log rather than the latest message.") val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") - .withRequiredArg - .describedAs("num_messages") - .ofType(classOf[java.lang.Integer]) + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + - "skip it instead of halt.") + "skip it instead of halt.") val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") + "set, the csv metrics will be outputed here") .withRequiredArg .describedAs("metrics dictory") .ofType(classOf[java.lang.String]) + val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.") if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") - + var groupIdPassed = true val options: OptionSet = tryParse(parser, args) - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + val useNewConsumer = options.has(useNewConsumerOpt) + val filterOpt = List(whitelistOpt, blacklistOpt).filter(options.has) val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) - if (topicOrFilterOpt.size != 1) - CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") + if (useNewConsumer) { + // TODO: this should be lifted after the new consumer has been fully developed + if (filterOpt.size > 0) + CommandLineUtils.printUsageAndDie(parser, "Can only consume topic with new consumer.") + } else { + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + if (topicOrFilterOpt.size != 1) + CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") + } val topicArg = options.valueOf(topicOrFilterOpt.head) val filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) @@ -113,91 +245,22 @@ object ConsoleConsumer extends Logging { KafkaMetricsReporter.startReporters(verifiableProps) } - val consumerProps = if (options.has(consumerConfigOpt)) - Utils.loadProps(options.valueOf(consumerConfigOpt)) - else - new Properties() + val consumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerConfigOpt)) - if(!consumerProps.containsKey("group.id")) { - consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) - groupIdPassed=false - } - consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") - consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - - if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) && - checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) { - System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id") - +". Please use --delete-consumer-offsets to delete previous offsets metadata") - System.exit(1) - } - - if(options.has(deleteConsumerOffsetsOpt)) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id")) - - val config = new ConsumerConfig(consumerProps) val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 - val connector = Consumer.create(config) - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - connector.shutdown() - // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if(!groupIdPassed) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id")) - } - }) - - var numMessages = 0L - val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] - formatter.init(formatterArgs) - try { - val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) - val iter = if(maxMessages >= 0) - stream.slice(0, maxMessages) - else - stream - - for(messageAndTopic <- iter) { - try { - formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) - numMessages += 1 - } catch { - case e: Throwable => - if (skipMessageOnError) - error("Error processing message, skipping this message: ", e) - else - throw e - } - if(System.out.checkError()) { - // This means no one is listening to our output stream any more, time to shutdown - System.err.println("Unable to write to standard out, closing consumer.") - System.err.println("Consumed %d messages".format(numMessages)) - formatter.close() - connector.shutdown() - System.exit(1) + def tryParse(parser: OptionParser, args: Array[String]) = { + try { + parser.parse(args : _*) + } catch { + case e: OptionException => { + Utils.croak(e.getMessage) + null } } - } catch { - case e: Throwable => error("Error processing message, stopping consumer: ", e) - } - System.err.println("Consumed %d messages".format(numMessages)) - System.out.flush() - formatter.close() - connector.shutdown() - } - - def tryParse(parser: OptionParser, args: Array[String]) = { - try { - parser.parse(args : _*) - } catch { - case e: OptionException => { - Utils.croak(e.getMessage) - null - } } } diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 00265f9..6971e6e 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -74,7 +74,7 @@ object ConsoleProducer { def getOldProducerProps(config: ProducerConfig): Properties = { - val props = new Properties; + val props = new Properties props.putAll(config.extraProducerProps) @@ -100,7 +100,7 @@ object ConsoleProducer { def getNewProducerProps(config: ProducerConfig): Properties = { - val props = new Properties; + val props = new Properties props.putAll(config.extraProducerProps) -- 1.7.12.4 From f3bd99f81a36c866ff5cf66e869603cef0b65bec Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 13 Mar 2015 10:18:41 -0700 Subject: [PATCH 3/5] KAFKA-2015.v1 --- .../main/scala/kafka/tools/ConsoleConsumer.scala | 73 ++++++++-------------- 1 file changed, 27 insertions(+), 46 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index ef97cbf..1ec0dc1 100644 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -24,7 +24,6 @@ import java.util.Properties import java.util.Random import java.io.PrintStream import kafka.message._ -import kafka.serializer._ import kafka.utils._ import kafka.metrics.KafkaMetricsReporter import kafka.consumer.{OldConsumer, NewShinyConsumer, Blacklist, Whitelist} @@ -42,7 +41,7 @@ object ConsoleConsumer extends Logging { val consumer = if(config.useNewConsumer) { - new NewShinyConsumer(config.topicIdOpt, getNewConsumerProps(config)) + new NewShinyConsumer(config.topicArg, getNewConsumerProps(config)) } else { new OldConsumer(config.filterSpec, getOldConsumerProps(config)) } @@ -53,71 +52,43 @@ object ConsoleConsumer extends Logging { } }) - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - connector.shutdown() - // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if(!config.groupIdPassed) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id")) - } - }) - var numMessages = 0L val formatter: MessageFormatter = config.messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(config.formatterArgs) try { + while (config.maxMessages < 0 || numMessages < config.maxMessages) { + val record = consumer.receive() - - var message: KeyedMessage[Array[Byte], Array[Byte]] = null - do { - message = reader.readMessage() - if(message != null) - producer.send(message.topic, message.key, message.message) - } while(message != null) - } catch { - case e: Exception => - e.printStackTrace - System.exit(1) - } - System.exit(0) - } - - - try { - val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) - val iter = if(maxMessages >= 0) - stream.slice(0, config.maxMessages) - else - stream - - for(messageAndTopic <- iter) { try { - formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) + formatter.writeTo(record.key, record.value, System.out) numMessages += 1 } catch { case e: Throwable => - if (skipMessageOnError) + if (config.skipMessageOnError) error("Error processing message, skipping this message: ", e) else throw e } + if(System.out.checkError()) { // This means no one is listening to our output stream any more, time to shutdown System.err.println("Unable to write to standard out, closing consumer.") System.err.println("Consumed %d messages".format(numMessages)) formatter.close() - connector.shutdown() + consumer.close() System.exit(1) } } } catch { case e: Throwable => error("Error processing message, stopping consumer: ", e) } + System.err.println("Consumed %d messages".format(numMessages)) System.out.flush() formatter.close() - connector.shutdown() + consumer.close() + } def getOldConsumerProps(config: ConsumerConfig): Properties = { val props = new Properties @@ -128,18 +99,26 @@ object ConsoleConsumer extends Logging { props.put("group.id", "console-consumer-" + new Random().nextInt(100000)) config.groupIdPassed = false } - props.put("auto.offset.reset", if(options.has(config.resetBeginningOpt)) "smallest" else "largest") - props.put("zookeeper.connect", options.valueOf(config.zkConnectOpt)) + props.put("auto.offset.reset", if(config.options.has(config.resetBeginningOpt)) "smallest" else "largest") + props.put("zookeeper.connect", config.options.valueOf(config.zkConnectOpt)) - if (!options.has(config.deleteConsumerOffsetsOpt) && options.has(config.resetBeginningOpt) && - checkZkPathExists(options.valueOf(config.zkConnectOpt),"/consumers/" + props.getProperty("group.id")+ "/offsets")) { + if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt) && + checkZkPathExists(config.options.valueOf(config.zkConnectOpt),"/consumers/" + props.getProperty("group.id")+ "/offsets")) { System.err.println("Found previous offset information for this group "+ props.getProperty("group.id") +". Please use --delete-consumer-offsets to delete previous offsets metadata") System.exit(1) } - if(options.has(config.deleteConsumerOffsetsOpt)) - ZkUtils.maybeDeletePath(options.valueOf(config.zkConnectOpt), "/consumers/" + props.getProperty("group.id")) + if(config.options.has(config.deleteConsumerOffsetsOpt)) + ZkUtils.maybeDeletePath(config.options.valueOf(config.zkConnectOpt), "/consumers/" + props.getProperty("group.id")) + + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack + if(!config.groupIdPassed) + ZkUtils.maybeDeletePath(config.options.valueOf(config.zkConnectOpt), "/consumers/" + props.get("group.id")) + } + }) props } @@ -153,7 +132,7 @@ object ConsoleConsumer extends Logging { props.put(ConsumerConfig.GROUP_ID_CONFIG, "console-consumer-" + new Random().nextInt(100000)) config.groupIdPassed = false } - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if(options.has(resetBeginningOpt)) "earliest" else "latest") + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if(config.options.has(config.resetBeginningOpt)) "earliest" else "latest") props } @@ -218,6 +197,8 @@ object ConsoleConsumer extends Logging { val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) if (useNewConsumer) { // TODO: this should be lifted after the new consumer has been fully developed + if (!options.has(topicIdOpt)) + CommandLineUtils.printUsageAndDie(parser, "Must specify the topic name with new consumer.") if (filterOpt.size > 0) CommandLineUtils.printUsageAndDie(parser, "Can only consume topic with new consumer.") } else { -- 1.7.12.4 From 5516d69307d586d5ad325eaf8ff65c25ff85dc82 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 13 Mar 2015 15:10:45 -0700 Subject: [PATCH 4/5] dummy --- config/tools-log4j.properties | 2 +- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/config/tools-log4j.properties b/config/tools-log4j.properties index 52f07c9..9e2d8d2 100644 --- a/config/tools-log4j.properties +++ b/config/tools-log4j.properties @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=WARN, stdout +log4j.rootLogger=TRACE, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 1ec0dc1..4b19052 100644 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -132,6 +132,8 @@ object ConsoleConsumer extends Logging { props.put(ConsumerConfig.GROUP_ID_CONFIG, "console-consumer-" + new Random().nextInt(100000)) config.groupIdPassed = false } + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if(config.options.has(config.resetBeginningOpt)) "earliest" else "latest") props @@ -226,7 +228,10 @@ object ConsoleConsumer extends Logging { KafkaMetricsReporter.startReporters(verifiableProps) } - val consumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerConfigOpt)) + val consumerProps = if (options.has(consumerConfigOpt)) + Utils.loadProps(options.valueOf(consumerConfigOpt)) + else + new Properties() val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) -- 1.7.12.4 From 37f4656f018badac116fdc08fcb5a93ce7b6c1e7 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 13 Mar 2015 15:24:48 -0700 Subject: [PATCH 5/5] dummy --- config/tools-log4j.properties | 2 +- .../main/scala/kafka/consumer/BaseConsumer.scala | 78 ++++++++++++++++++++++ .../main/scala/kafka/tools/ConsoleConsumer.scala | 4 +- 3 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 core/src/main/scala/kafka/consumer/BaseConsumer.scala diff --git a/config/tools-log4j.properties b/config/tools-log4j.properties index 9e2d8d2..10cd6c2 100644 --- a/config/tools-log4j.properties +++ b/config/tools-log4j.properties @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=TRACE, stdout +log4j.rootLogger=WARN, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala new file mode 100644 index 0000000..88a8e44 --- /dev/null +++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer + +import java.util.Properties + +// A base consumer used whenever we need to have options for both old and new consumer; +// this class will be removed once we deprecate old consumer +trait BaseConsumer { + def receive(): BaseConsumerRecord + def close() +} + +case class BaseConsumerRecord (topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte]) + +class NewShinyConsumer(topic: String, consumerProps: Properties) extends BaseConsumer { + import org.apache.kafka.clients.consumer.KafkaConsumer + import org.apache.kafka.common.TopicPartition + import scala.collection.JavaConversions._ + + // decide whether to send synchronously based on producer properties + val consumer = new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps) + // subscribe to topic + // TODO: by subscribing to all partitions + val partitionsInfo = consumer.partitionsFor(topic) + for (partitionInfo <- partitionsInfo) { + consumer.subscribe(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) + } + var recordIter = consumer.poll(0).iterator() + + override def receive(): BaseConsumerRecord = { + while (!recordIter.hasNext) + recordIter = consumer.poll(0).iterator() + + val record = recordIter.next() + BaseConsumerRecord(record.topic(), record.partition(), record.offset(), record.key(), record.value()) + } + + override def close() { + this.consumer.close() + } +} + +class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends BaseConsumer { + import kafka.consumer.{Consumer, ConsumerConfig} + import kafka.serializer.DefaultDecoder + + val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps)) + val stream: KafkaStream[Array[Byte],Array[Byte]] = + consumerConnector.createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder()).head + val iter = stream.iterator() + + override def receive(): BaseConsumerRecord = { + // we do not need to check hasNext for KafkaStream iterator + val messageAndMetadata = iter.next() + BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset, messageAndMetadata.key(), messageAndMetadata.message()) + } + + override def close() { + this.consumerConnector.shutdown() + } +} + diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 4b19052..495af22 100644 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -187,7 +187,9 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("metrics dictory") .ofType(classOf[java.lang.String]) - val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.") + // TODO: remove the NOTE after the new consumer has been fully developed + val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation; NOTE: without the rebalance " + + "implementation a single consumer will try to subscribe to all partitions of the given topic.") if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") -- 1.7.12.4