From 13799bf60b7400037b4ed2a6427d4edd40d75d4b Mon Sep 17 00:00:00 2001 From: asingh Date: Tue, 24 Feb 2015 10:31:16 -0800 Subject: [PATCH] KAFKA-1982: change kafka.examples.Producer to use the new java producer --- .../common/serialization/IntegerDeserializer.java | 37 ++++++++++++++ .../common/serialization/IntegerSerializer.java | 37 ++++++++++++++ .../src/main/java/kafka/examples/Consumer.java | 15 +++--- .../src/main/java/kafka/examples/Producer.java | 58 +++++++++++++++++----- 4 files changed, 129 insertions(+), 18 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java create mode 100644 clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java new file mode 100644 index 0000000..17d49ac --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.nio.ByteBuffer; +import java.util.Map; + +/** + * String encoding defaults to UTF8 and can be customized by setting the property key.deserializer.encoding, + * value.deserializer.encoding or deserializer.encoding. The first two take precedence over the last. + */ +public class IntegerDeserializer implements Deserializer { + + public void configure(Map configs, boolean isKey) { + // nothing to do + } + + public Integer deserialize(String topic, byte[] data) { + if (data == null) + return null; + return ByteBuffer.wrap(data).getInt(); + } + + public void close() { + // nothing to do + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java new file mode 100644 index 0000000..953c20d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.nio.ByteBuffer; +import java.util.Map; + +/** + * String encoding defaults to UTF8 and can be customized by setting the property key.serializer.encoding, + * value.serializer.encoding or serializer.encoding. The first two take precedence over the last. + */ +public class IntegerSerializer implements Serializer { + + public void configure(Map configs, boolean isKey) { + // nothing to do + } + + public byte[] serialize(String topic, Integer data) { + if (data == null) + return null; + return ByteBuffer.allocate(4).putInt(data).array(); + } + + public void close() { + // nothing to do + } +} \ No newline at end of file diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 13135b9..8af64d8 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -17,14 +17,15 @@ package kafka.examples; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; public class Consumer extends Thread @@ -54,11 +55,13 @@ public class Consumer extends Thread public void run() { Map topicCountMap = new HashMap(); - topicCountMap.put(topic, new Integer(1)); + topicCountMap.put(topic, 1); Map>> consumerMap = consumer.createMessageStreams(topicCountMap); - KafkaStream stream = consumerMap.get(topic).get(0); - ConsumerIterator it = stream.iterator(); - while(it.hasNext()) - System.out.println(new String(it.next().message())); + KafkaStream stream = consumerMap.get(topic).get(0); + for (MessageAndMetadata messageAndMetadata : stream) { + System.out.println("Received message: (" + ByteBuffer.wrap(messageAndMetadata.key()).getInt() + + ", " + + "" + new String(messageAndMetadata.message()) + ")"); + } } } diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 96e9893..e2d0426 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -18,33 +18,67 @@ package kafka.examples; import java.util.Properties; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; public class Producer extends Thread { - private final kafka.javaapi.producer.Producer producer; + private final KafkaProducer producer; private final String topic; - private final Properties props = new Properties(); public Producer(String topic) { - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("metadata.broker.list", "localhost:9092"); - // Use random partitioner. Don't need the key type. Just set it to Integer. - // The message is of type String. - producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props)); + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("client.id", "DemoProducer"); + props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer(props); this.topic = topic; } - + public void run() { int messageNo = 1; while(true) { - String messageStr = new String("Message_" + messageNo); - producer.send(new KeyedMessage(topic, messageStr)); + String messageStr = "Message_" + messageNo; + long startTime = System.currentTimeMillis(); + producer.send(new ProducerRecord(topic, + messageNo, + messageStr), new DemoCallBack(startTime, messageNo, messageStr)); messageNo++; } } +} + +class DemoCallBack implements Callback { + + private long startTime; + private int key; + private String message; + + public DemoCallBack(long startTime, int key, String message) { + this.startTime = startTime; + this.key = key; + this.message = message; + } + /** + * A callback method the user can implement to provide asynchronous handling of request completion. This method will + * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be + * non-null. + * + * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error + * occurred. + * @param exception The exception thrown during processing of this record. Null if no error occurred. + */ + public void onCompletion(RecordMetadata metadata, Exception exception) { + long elapsedTime = System.currentTimeMillis() - startTime; + System.out.println( + "Record [" + key + ", " + message + "] sent to partition [" + metadata.partition() + "], " + + "offset [" + metadata.offset() + "] in " + elapsedTime + " ms"); + } } -- 1.9.3 (Apple Git-50)