From 76a6284deffa14c56ceee451ae61159f7af7706a Mon Sep 17 00:00:00 2001 From: asingh Date: Tue, 24 Feb 2015 10:31:16 -0800 Subject: [PATCH] KAFKA-1982: change kafka.examples.Producer to use the new java producer --- .../common/serialization/IntegerDeserializer.java | 33 ++++++++ .../common/serialization/IntegerSerializer.java | 33 ++++++++ .../common/serialization/SerializationTest.java | 91 +++++++++++++--------- examples/README | 5 +- .../src/main/java/kafka/examples/Consumer.java | 15 ++-- .../kafka/examples/KafkaConsumerProducerDemo.java | 3 +- .../src/main/java/kafka/examples/Producer.java | 83 ++++++++++++++++---- .../java/kafka/examples/SimpleConsumerDemo.java | 4 +- 8 files changed, 204 insertions(+), 63 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java create mode 100644 clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java new file mode 100644 index 0000000..635c831 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class IntegerDeserializer implements Deserializer { + + public void configure(Map configs, boolean isKey) { + // nothing to do + } + + public Integer deserialize(String topic, byte[] data) { + if (data == null) + return null; + return ByteBuffer.wrap(data).getInt(); + } + + public void close() { + // nothing to do + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java new file mode 100644 index 0000000..da06803 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.nio.ByteBuffer; +import java.util.Map; + +public class IntegerSerializer implements Serializer { + + public void configure(Map configs, boolean isKey) { + // nothing to do + } + + public byte[] serialize(String topic, Integer data) { + if (data == null) + return null; + return ByteBuffer.allocate(4).putInt(data).array(); + } + + public void close() { + // nothing to do + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index f5cd61c..026f3da 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -23,48 +23,63 @@ import static org.junit.Assert.assertEquals; public class SerializationTest { - private static class SerDeser { - final Serializer serializer; - final Deserializer deserializer; - - public SerDeser(Serializer serializer, Deserializer deserializer) { - this.serializer = serializer; - this.deserializer = deserializer; - } - } + private static class SerDeser { + final Serializer serializer; + final Deserializer deserializer; - @Test - public void testStringSerializer() { - String str = "my string"; - String mytopic = "testTopic"; - List encodings = new ArrayList(); - encodings.add("UTF8"); - encodings.add("UTF-16"); - - for (String encoding : encodings) { - SerDeser serDeser = getStringSerDeser(encoding); - Serializer serializer = serDeser.serializer; - Deserializer deserializer = serDeser.deserializer; - - assertEquals("Should get the original string after serialization and deserialization with encoding " + encoding, - str, deserializer.deserialize(mytopic, serializer.serialize(mytopic, str))); - - assertEquals("Should support null in serialization and deserialization with encoding " + encoding, - null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null))); - } + public SerDeser(Serializer serializer, Deserializer deserializer) { + this.serializer = serializer; + this.deserializer = deserializer; } + } + + @Test + public void testStringSerializer() { + String str = "my string"; + String mytopic = "testTopic"; + List encodings = new ArrayList(); + encodings.add("UTF8"); + encodings.add("UTF-16"); - private SerDeser getStringSerDeser(String encoder) { - Map serializerConfigs = new HashMap(); - serializerConfigs.put("key.serializer.encoding", encoder); - Serializer serializer = new StringSerializer(); - serializer.configure(serializerConfigs, true); + for (String encoding : encodings) { + SerDeser serDeser = getStringSerDeser(encoding); + Serializer serializer = serDeser.serializer; + Deserializer deserializer = serDeser.deserializer; - Map deserializerConfigs = new HashMap(); - deserializerConfigs.put("key.deserializer.encoding", encoder); - Deserializer deserializer = new StringDeserializer(); - deserializer.configure(deserializerConfigs, true); + assertEquals("Should get the original string after serialization and deserialization with encoding " + encoding, + str, deserializer.deserialize(mytopic, serializer.serialize(mytopic, str))); - return new SerDeser(serializer, deserializer); + assertEquals("Should support null in serialization and deserialization with encoding " + encoding, + null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null))); } + } + + @Test + public void testIntegerSerializer() { + Integer integer = 423412424; + String mytopic = "testTopic"; + + Serializer serializer = new IntegerSerializer(); + Deserializer deserializer = new IntegerDeserializer(); + + assertEquals("Should get the original integer after serialization and deserialization", + integer, deserializer.deserialize(mytopic, serializer.serialize(mytopic, integer))); + + assertEquals("Should support null in serialization and deserialization", + null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null))); + } + + private SerDeser getStringSerDeser(String encoder) { + Map serializerConfigs = new HashMap(); + serializerConfigs.put("key.serializer.encoding", encoder); + Serializer serializer = new StringSerializer(); + serializer.configure(serializerConfigs, true); + + Map deserializerConfigs = new HashMap(); + deserializerConfigs.put("key.deserializer.encoding", encoder); + Deserializer deserializer = new StringDeserializer(); + deserializer.configure(deserializerConfigs, true); + + return new SerDeser(serializer, deserializer); + } } diff --git a/examples/README b/examples/README index 53db696..f6e3410 100644 --- a/examples/README +++ b/examples/README @@ -3,6 +3,7 @@ This directory contains examples of client code that uses kafka. To run the demo: 1. Start Zookeeper and the Kafka server - 2. For simple consumer demo, run bin/java-simple-consumer-demo.sh - 3. For unlimited producer-consumer run, run bin/java-producer-consumer-demo.sh + 2. For simple consumer demo, `run bin/java-simple-consumer-demo.sh` + 3. For unlimited sync-producer-consumer run, `run bin/java-producer-consumer-demo.sh sync` + 4. For unlimited async-producer-consumer run, `run bin/java-producer-consumer-demo.sh` diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java index 13135b9..8af64d8 100644 --- a/examples/src/main/java/kafka/examples/Consumer.java +++ b/examples/src/main/java/kafka/examples/Consumer.java @@ -17,14 +17,15 @@ package kafka.examples; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; public class Consumer extends Thread @@ -54,11 +55,13 @@ public class Consumer extends Thread public void run() { Map topicCountMap = new HashMap(); - topicCountMap.put(topic, new Integer(1)); + topicCountMap.put(topic, 1); Map>> consumerMap = consumer.createMessageStreams(topicCountMap); - KafkaStream stream = consumerMap.get(topic).get(0); - ConsumerIterator it = stream.iterator(); - while(it.hasNext()) - System.out.println(new String(it.next().message())); + KafkaStream stream = consumerMap.get(topic).get(0); + for (MessageAndMetadata messageAndMetadata : stream) { + System.out.println("Received message: (" + ByteBuffer.wrap(messageAndMetadata.key()).getInt() + + ", " + + "" + new String(messageAndMetadata.message()) + ")"); + } } } diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java index 1239394..e96991a 100644 --- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java +++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java @@ -20,7 +20,8 @@ public class KafkaConsumerProducerDemo implements KafkaProperties { public static void main(String[] args) { - Producer producerThread = new Producer(KafkaProperties.topic); + final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true; + Producer producerThread = new Producer(KafkaProperties.topic, isAsync); producerThread.start(); Consumer consumerThread = new Consumer(KafkaProperties.topic); diff --git a/examples/src/main/java/kafka/examples/Producer.java b/examples/src/main/java/kafka/examples/Producer.java index 96e9893..8562a85 100644 --- a/examples/src/main/java/kafka/examples/Producer.java +++ b/examples/src/main/java/kafka/examples/Producer.java @@ -18,33 +18,88 @@ package kafka.examples; import java.util.Properties; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; public class Producer extends Thread { - private final kafka.javaapi.producer.Producer producer; + private final KafkaProducer producer; private final String topic; - private final Properties props = new Properties(); + private final Boolean isAsync; - public Producer(String topic) + public Producer(String topic, Boolean isAsync) { - props.put("serializer.class", "kafka.serializer.StringEncoder"); - props.put("metadata.broker.list", "localhost:9092"); - // Use random partitioner. Don't need the key type. Just set it to Integer. - // The message is of type String. - producer = new kafka.javaapi.producer.Producer(new ProducerConfig(props)); + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("client.id", "DemoProducer"); + props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer(props); this.topic = topic; + this.isAsync = isAsync; } - + public void run() { int messageNo = 1; while(true) { - String messageStr = new String("Message_" + messageNo); - producer.send(new KeyedMessage(topic, messageStr)); - messageNo++; + String messageStr = "Message_" + messageNo; + long startTime = System.currentTimeMillis(); + if (isAsync) { // Send asynchronously + producer.send(new ProducerRecord(topic, + messageNo, + messageStr), new DemoCallBack(startTime, messageNo, messageStr)); + messageNo++; + } else { // Send synchronously + try { + producer.send(new ProducerRecord(topic, + messageNo, + messageStr)).get(); + System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + messageNo++; + } } } +} + +class DemoCallBack implements Callback { + + private long startTime; + private int key; + private String message; + public DemoCallBack(long startTime, int key, String message) { + this.startTime = startTime; + this.key = key; + this.message = message; + } + + /** + * A callback method the user can implement to provide asynchronous handling of request completion. This method will + * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be + * non-null. + * + * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error + * occurred. + * @param exception The exception thrown during processing of this record. Null if no error occurred. + */ + public void onCompletion(RecordMetadata metadata, Exception exception) { + long elapsedTime = System.currentTimeMillis() - startTime; + if (metadata != null) { + System.out.println( + "Record [" + key + ", " + message + "] sent to partition [" + metadata.partition() + "], " + + "offset [" + metadata.offset() + "] in " + elapsedTime + " ms"); + } else { + exception.printStackTrace(); + } + } } diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java index 0d66fe5..e5096f0 100644 --- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java +++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java @@ -42,9 +42,9 @@ public class SimpleConsumerDemo { } private static void generateData() { - Producer producer2 = new Producer(KafkaProperties.topic2); + Producer producer2 = new Producer(KafkaProperties.topic2, false); producer2.start(); - Producer producer3 = new Producer(KafkaProperties.topic3); + Producer producer3 = new Producer(KafkaProperties.topic3, false); producer3.start(); try { Thread.sleep(1000); -- 1.9.3 (Apple Git-50)