From 293ef26887e59f0c9829a45027ec7d33d60f6445 Mon Sep 17 00:00:00 2001 From: asingh Date: Tue, 24 Feb 2015 10:31:16 -0800 Subject: [PATCH] KAFKA-1982: change kafka.examples.Producer to use the new java producer --- .../src/main/java/kafka/examples/Consumer.java | 2 +- .../src/main/java/kafka/examples/Producer.java | 61 +++++++++++++++++----- 2 files changed, 50 insertions(+), 13 deletions(-) diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 13135b9..732f42a 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -59,6 +59,6 @@ public class Consumer extends Thread KafkaStream stream = consumerMap.get(topic).get(0); ConsumerIterator it = stream.iterator(); while(it.hasNext()) - System.out.println(new String(it.next().message())); + System.out.println("Received message: " + new String(it.next().message())); } } diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 96e9893..9edf488 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -18,33 +18,70 @@ package kafka.examples; import java.util.Properties; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; +import java.util.concurrent.Future; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; public class Producer extends Thread { - private final kafka.javaapi.producer.Producer producer; + private final KafkaProducer producer; private final String topic; - private final Properties props = new Properties(); public Producer(String topic) { - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("metadata.broker.list", "localhost:9092"); - // Use random partitioner. Don't need the key type. Just set it to Integer. - // The message is of type String. - producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props)); + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("client.id", "DemoProducer"); + props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer(props); this.topic = topic; } - + public void run() { int messageNo = 1; while(true) { - String messageStr = new String("Message_" + messageNo); - producer.send(new KeyedMessage(topic, messageStr)); + String messageStr = "Message_" + messageNo; + long startTime = System.currentTimeMillis(); + producer.send(new ProducerRecord(topic, + topic.getBytes(), + messageStr), new DemoCallBack(startTime, messageStr)); messageNo++; } } +} + +class DemoCallBack implements Callback { + + private long startTime; + private String message; + + public DemoCallBack(long startTime, String message) { + this.startTime = startTime; + this.message = message; + } + /** + * A callback method the user can implement to provide asynchronous handling of request completion. This method will + * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be + * non-null. + * + * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error + * occurred. + * @param exception The exception thrown during processing of this record. Null if no error occurred. + */ + public void onCompletion(RecordMetadata metadata, Exception exception) { + long elapsedTime = System.currentTimeMillis() - startTime; + System.out.println( + "==================================================\n" + + "Record : " + message + "\n" + + "Time to send : " + elapsedTime + " ms\n" + + "Partition of the record: " + metadata.partition() + "\n" + + "Offset of the record : " + metadata.offset() + "\n" + + "=================================================="); + } } -- 1.9.3 (Apple Git-50)