Index: core/src/main/scala/kafka/tools/ProducerShell.scala =================================================================== --- core/src/main/scala/kafka/tools/ProducerShell.scala (revision 1198630) +++ core/src/main/scala/kafka/tools/ProducerShell.scala (working copy) @@ -23,6 +23,7 @@ import kafka.message._ import kafka.producer._ import java.util.Properties +import kafka.utils.Utils /** * Interactive shell for producing messages from the command line @@ -32,9 +33,9 @@ def main(args: Array[String]) { val parser = new OptionParser - val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") + val producerPropsOpt = parser.accepts("props", "REQUIRED: Properties file with the producer properties.") .withRequiredArg - .describedAs("kafka://hostname:port") + .describedAs("properties") .ofType(classOf[String]) val topicOpt = parser.accepts("topic", "REQUIRED: The topic to produce to.") .withRequiredArg @@ -43,7 +44,7 @@ val options = parser.parse(args : _*) - for(arg <- List(urlOpt, topicOpt)) { + for(arg <- List(producerPropsOpt, topicOpt)) { if(!options.has(arg)) { System.err.println("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) @@ -51,15 +52,10 @@ } } - val url = new URI(options.valueOf(urlOpt)) + val propsFile = options.valueOf(producerPropsOpt) + val producerConfig = new ProducerConfig(Utils.loadProps(propsFile)) val topic = options.valueOf(topicOpt) - val props = new Properties() - props.put("host", url.getHost) - props.put("port", url.getPort.toString) - props.put("buffer.size", "65536") - props.put("connect.timeout.ms", "10000") - props.put("reconnect.interval", "100") - val producer = new SyncProducer(new SyncProducerConfig(props)) + val producer = new Producer[String, String](producerConfig) val input = new BufferedReader(new InputStreamReader(System.in)) var done = false @@ -68,10 +64,9 @@ if(line == null) { done = true } else { - val lineBytes = line.trim.getBytes() - val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(lineBytes)) - producer.send(topic, messageList) - println("Sent: %s (%d bytes)".format(line, messageList.sizeInBytes)) + val message = line.trim + producer.send(new ProducerData[String, String](topic, message)) + println("Sent: %s (%d bytes)".format(line, message.getBytes.length)) } } producer.close() Index: config/producer.properties =================================================================== --- config/producer.properties (revision 0) +++ config/producer.properties (revision 0) @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.producer.ProducerConfig for more details + +# need to set either broker.list or zk.connect + +# configure brokers statically +# format: brokerid1:host1:port1,brokerid2:host2:port2 ... +broker.list=0:localhost:9092 + +# discover brokers from ZK +#zk.connect= + +# name of the partitioner class for partitioning events; default partition spreads data randomly +#partitioner.class= + +# specifies whether the messages are sent asynchronously (async) or synchronously (sync) +producer.type=sync + +# specify the compression codec for all data generated: 0: no compression, 1: gzip +compression.codec=0 + +# message encoder +serializer.class=kafka.serializer.StringEncoder + + +# allow topic level compression +#compressed.topics= +