From d1f255aaca0d280e805eb315c92b0700d9957e43 Mon Sep 17 00:00:00 2001 From: asingh Date: Tue, 24 Feb 2015 10:31:16 -0800 Subject: [PATCH] KAFKA-1982: change kafka.examples.Producer to use the new java producer --- .../src/main/java/kafka/examples/Producer.java | 25 +++++++++++----------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 96e9893..a745ca6 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -18,33 +18,32 @@ package kafka.examples; import java.util.Properties; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; public class Producer extends Thread { - private final kafka.javaapi.producer.Producer producer; + private final KafkaProducer producer; private final String topic; - private final Properties props = new Properties(); public Producer(String topic) { - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("metadata.broker.list", "localhost:9092"); - // Use random partitioner. Don't need the key type. Just set it to Integer. - // The message is of type String. - producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props)); + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("client.id", "DemoProducer"); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer(props); this.topic = topic; } - + public void run() { int messageNo = 1; while(true) { - String messageStr = new String("Message_" + messageNo); - producer.send(new KeyedMessage(topic, messageStr)); + String messageStr = "Message_" + messageNo; + producer.send(new ProducerRecord(topic, topic.getBytes(), messageStr)); messageNo++; } } - } -- 1.9.3 (Apple Git-50)