From 830dbe7f9164d1bd8fa3abd449b63813b3b98bc5 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Mon, 24 Nov 2014 17:55:22 -0800 Subject: [PATCH 1/3] add the serializer/deserializer api to the clients --- .../clients/consumer/ByteArrayDeserializer.java | 34 ++++++++++++++++ .../apache/kafka/clients/consumer/Consumer.java | 4 +- .../kafka/clients/consumer/ConsumerConfig.java | 14 ++++++- .../kafka/clients/consumer/ConsumerRecord.java | 16 ++++---- .../kafka/clients/consumer/ConsumerRecords.java | 14 +++---- .../kafka/clients/consumer/Deserializer.java | 38 +++++++++++++++++ .../kafka/clients/consumer/KafkaConsumer.java | 16 ++++++-- .../kafka/clients/consumer/MockConsumer.java | 6 +-- .../clients/producer/ByteArraySerializer.java | 34 ++++++++++++++++ .../kafka/clients/producer/KafkaProducer.java | 24 ++++++++--- .../apache/kafka/clients/producer/Producer.java | 6 +-- .../kafka/clients/producer/ProducerConfig.java | 15 +++++-- .../kafka/clients/producer/ProducerRecord.java | 20 ++++----- .../apache/kafka/clients/producer/Serializer.java | 38 +++++++++++++++++ .../clients/producer/internals/Partitioner.java | 2 +- .../common/errors/DeserializationException.java | 47 ++++++++++++++++++++++ .../common/errors/SerializationException.java | 46 +++++++++++++++++++++ .../main/scala/kafka/producer/BaseProducer.scala | 4 +- .../scala/kafka/producer/KafkaLog4jAppender.scala | 6 +-- core/src/main/scala/kafka/tools/MirrorMaker.scala | 21 +++++----- .../main/scala/kafka/tools/ReplayLogProducer.scala | 4 +- .../scala/kafka/tools/TestEndToEndLatency.scala | 4 +- .../main/scala/kafka/tools/TestLogCleaning.scala | 6 +-- .../kafka/api/ProducerCompressionTest.scala | 4 +- .../kafka/api/ProducerFailureHandlingTest.scala | 32 +++++++-------- .../integration/kafka/api/ProducerSendTest.scala | 16 ++++---- .../test/scala/unit/kafka/utils/TestUtils.scala | 4 +- 27 files changed, 375 insertions(+), 100 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java new file mode 100644 index 0000000..514cbd2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.consumer; + +import java.util.Map; + +public class ByteArrayDeserializer implements Deserializer { + + @Override + public void configure(Map configs) { + // nothing to do + } + + @Override + public byte[] deserialize(String topic, byte[] data, boolean isKey) { + return data; + } + + @Override + public void close() { + // nothing to do + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 227f564..1bce501 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition; * @see KafkaConsumer * @see MockConsumer */ -public interface Consumer extends Closeable { +public interface Consumer extends Closeable { /** * Incrementally subscribe to the given list of topics. This API is mutually exclusive to @@ -63,7 +63,7 @@ public interface Consumer extends Closeable { * of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}. * If no data is available for timeout ms, returns an empty list */ - public Map poll(long timeout); + public Map> poll(long timeout); /** * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 46efc0c..1d64f08 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -151,6 +151,14 @@ public class ConsumerConfig extends AbstractConfig { public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + /** key.deserializer */ + public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer"; + private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the Deserializer interface."; + + /** value.deserializer */ + public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; + private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the Deserializer interface."; + static { /* TODO: add config docs */ config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah") @@ -176,8 +184,10 @@ public class ConsumerConfig extends AbstractConfig { Importance.LOW, METRICS_SAMPLE_WINDOW_MS_DOC) .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC); - + .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) + .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC) + .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC); + } ConsumerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java index 436d8a4..16af70a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java @@ -18,10 +18,10 @@ import org.apache.kafka.common.TopicPartition; * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the * record is being received and an offset that points to the record in a Kafka partition. */ -public final class ConsumerRecord { +public final class ConsumerRecord { private final TopicPartition partition; - private final byte[] key; - private final byte[] value; + private final K key; + private final V value; private final long offset; private volatile Exception error; @@ -34,7 +34,7 @@ public final class ConsumerRecord { * @param value The record contents * @param offset The offset of this record in the corresponding Kafka partition */ - public ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset) { + public ConsumerRecord(String topic, int partitionId, K key, V value, long offset) { this(topic, partitionId, key, value, offset, null); } @@ -46,7 +46,7 @@ public final class ConsumerRecord { * @param value The record contents * @param offset The offset of this record in the corresponding Kafka partition */ - public ConsumerRecord(String topic, int partitionId, byte[] value, long offset) { + public ConsumerRecord(String topic, int partitionId, V value, long offset) { this(topic, partitionId, null, value, offset); } @@ -60,7 +60,7 @@ public final class ConsumerRecord { this(topic, partitionId, null, null, -1L, error); } - private ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset, Exception error) { + private ConsumerRecord(String topic, int partitionId, K key, V value, long offset, Exception error) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null"); this.partition = new TopicPartition(topic, partitionId); @@ -95,7 +95,7 @@ public final class ConsumerRecord { * The key (or null if no key is specified) * @throws Exception The exception thrown while fetching this record. */ - public byte[] key() throws Exception { + public K key() throws Exception { if (this.error != null) throw this.error; return key; @@ -105,7 +105,7 @@ public final class ConsumerRecord { * The value * @throws Exception The exception thrown while fetching this record. */ - public byte[] value() throws Exception { + public V value() throws Exception { if (this.error != null) throw this.error; return value; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index 2ecfc8a..bdf4b26 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -21,12 +21,12 @@ import java.util.Map.Entry; * A container that holds the list {@link ConsumerRecord} per partition for a particular topic. There is one for every topic returned by a * {@link Consumer#poll(long)} operation. */ -public class ConsumerRecords { +public class ConsumerRecords { private final String topic; - private final Map> recordsPerPartition; + private final Map>> recordsPerPartition; - public ConsumerRecords(String topic, Map> records) { + public ConsumerRecords(String topic, Map>> records) { this.topic = topic; this.recordsPerPartition = records; } @@ -36,16 +36,16 @@ public class ConsumerRecords { * specified, returns records for all partitions * @return The list of {@link ConsumerRecord}s associated with the given partitions. */ - public List records(int... partitions) { - List recordsToReturn = new ArrayList(); + public List> records(int... partitions) { + List> recordsToReturn = new ArrayList>(); if(partitions.length == 0) { // return records for all partitions - for(Entry> record : recordsPerPartition.entrySet()) { + for(Entry>> record : recordsPerPartition.entrySet()) { recordsToReturn.addAll(record.getValue()); } } else { for(int partition : partitions) { - List recordsForThisPartition = recordsPerPartition.get(partition); + List> recordsForThisPartition = recordsPerPartition.get(partition); recordsToReturn.addAll(recordsForThisPartition); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java new file mode 100644 index 0000000..c774a19 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.Configurable; + +/** + * + * @param Type to be deserialized into. + * + * A class that implements this interface is expected to have a constructor with no parameter. + */ +public interface Deserializer extends Configurable { + /** + * + * @param topic Topic associated with the data + * @param data Serialized bytes + * @param isKey Is data for key or value + * @return + */ + public T deserialize(String topic, byte[] data, boolean isKey); + + /** + * Close this deserializer + */ + public void close(); +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index fe93afa..c683f40 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -24,7 +24,9 @@ import java.util.Set; import java.util.Map.Entry; import java.util.concurrent.Future; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.producer.Serializer; import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.JmxReporter; @@ -331,7 +333,7 @@ import org.slf4j.LoggerFactory; * } * */ -public class KafkaConsumer implements Consumer { +public class KafkaConsumer implements Consumer { private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); @@ -340,7 +342,9 @@ public class KafkaConsumer implements Consumer { private final Metrics metrics; private final Set subscribedTopics; private final Set subscribedPartitions; - + private final Deserializer keyDeserializer; + private final Deserializer valueDeserializer; + /** * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings * are documented here. Values can be @@ -402,6 +406,12 @@ public class KafkaConsumer implements Consumer { this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG); this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + Deserializer.class); + this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + Deserializer.class); + config.logUnused(); log.debug("Kafka consumer started"); } @@ -488,7 +498,7 @@ public class KafkaConsumer implements Consumer { * @return map of topic to records since the last fetch for the subscribed list of topics and partitions */ @Override - public Map poll(long timeout) { + public Map> poll(long timeout) { // TODO Auto-generated method stub return null; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index c3aad3b..8cab16c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -33,7 +33,7 @@ import org.apache.kafka.common.TopicPartition; * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it * needs to communicate with. Failure to close the consumer after use will leak these resources. */ -public class MockConsumer implements Consumer { +public class MockConsumer implements Consumer { private final Set subscribedPartitions; private final Set subscribedTopics; @@ -90,10 +90,10 @@ public class MockConsumer implements Consumer { } @Override - public Map poll(long timeout) { + public Map> poll(long timeout) { // hand out one dummy record, 1 per topic Map> records = new HashMap>(); - Map recordMetadata = new HashMap(); + Map> recordMetadata = new HashMap>(); for(TopicPartition partition : subscribedPartitions) { // get the last consumed offset long messageSequence = consumedOffsets.get(partition); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java b/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java new file mode 100644 index 0000000..9005b74 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.producer; + +import java.util.Map; + +public class ByteArraySerializer implements Serializer { + + @Override + public void configure(Map configs) { + // nothing to do + } + + @Override + public byte[] serialize(String topic, byte[] data, boolean isKey) { + return data; + } + + @Override + public void close() { + // nothing to do + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 32f444e..3d30d95 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory; * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it * needs to communicate with. Failure to close the producer after use will leak these resources. */ -public class KafkaProducer implements Producer { +public class KafkaProducer implements Producer { private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); @@ -75,6 +75,8 @@ public class KafkaProducer implements Producer { private final CompressionType compressionType; private final Sensor errors; private final Time time; + private final Serializer keySerializer; + private final Serializer valueSerializer; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -145,6 +147,11 @@ public class KafkaProducer implements Producer { this.errors = this.metrics.sensor("errors"); + this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + Serializer.class); + this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + Serializer.class); + config.logUnused(); log.debug("Kafka producer started"); } @@ -161,7 +168,7 @@ public class KafkaProducer implements Producer { * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)} */ @Override - public Future send(ProducerRecord record) { + public Future send(ProducerRecord record) { return send(record, null); } @@ -226,16 +233,19 @@ public class KafkaProducer implements Producer { * indicates no callback) */ @Override - public Future send(ProducerRecord record, Callback callback) { + public Future send(ProducerRecord record, Callback callback) { try { // first make sure the metadata for the topic is available waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); - int partition = partitioner.partition(record, metadata.fetch()); - int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value()); + byte[] serializedKey = keySerializer.serialize(record.topic(), record.key(), true); + byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value(), false); + ProducerRecord serializedRecord = new ProducerRecord(record.topic(), record.partition(), serializedKey, serializedValue); + int partition = partitioner.partition(serializedRecord, metadata.fetch()); + int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); - RecordAccumulator.RecordAppendResult result = accumulator.append(tp, record.key(), record.value(), compressionType, callback); + RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, compressionType, callback); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); @@ -324,6 +334,8 @@ public class KafkaProducer implements Producer { throw new KafkaException(e); } this.metrics.close(); + this.keySerializer.close(); + this.valueSerializer.close(); log.debug("The Kafka producer has closed."); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java index 36e8398..5baa606 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java @@ -31,7 +31,7 @@ import org.apache.kafka.common.PartitionInfo; * @see KafkaProducer * @see MockProducer */ -public interface Producer extends Closeable { +public interface Producer extends Closeable { /** * Send the given record asynchronously and return a future which will eventually contain the response information. @@ -39,12 +39,12 @@ public interface Producer extends Closeable { * @param record The record to send * @return A future which will eventually contain the response information */ - public Future send(ProducerRecord record); + public Future send(ProducerRecord record); /** * Send a record and invoke the given callback when the record has been acknowledged by the server */ - public Future send(ProducerRecord record, Callback callback); + public Future send(ProducerRecord record, Callback callback); /** * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 9095caf..9cdc13d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -17,7 +17,6 @@ import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; import java.util.Arrays; -import java.util.List; import java.util.Map; import org.apache.kafka.common.config.AbstractConfig; @@ -173,6 +172,14 @@ public class ProducerConfig extends AbstractConfig { public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."; + /** key.serializer */ + public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; + private static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the Serializer interface."; + + /** value.serializer */ + public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; + private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the Serializer interface."; + static { config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) @@ -180,7 +187,7 @@ public class ProducerConfig extends AbstractConfig { .define(ACKS_CONFIG, Type.STRING, "1", - in(Arrays.asList("all","-1", "0", "1")), + in(Arrays.asList("all", "-1", "0", "1")), Importance.HIGH, ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) @@ -219,7 +226,9 @@ public class ProducerConfig extends AbstractConfig { 5, atLeast(1), Importance.LOW, - MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC); + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) + .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) + .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java index c3181b3..065d4e6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java @@ -20,12 +20,12 @@ package org.apache.kafka.clients.producer; * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is * present a partition will be assigned in a round-robin fashion. */ -public final class ProducerRecord { +public final class ProducerRecord { private final String topic; private final Integer partition; - private final byte[] key; - private final byte[] value; + private final K key; + private final V value; /** * Creates a record to be sent to a specified topic and partition @@ -35,7 +35,7 @@ public final class ProducerRecord { * @param key The key that will be included in the record * @param value The record contents */ - public ProducerRecord(String topic, Integer partition, byte[] key, byte[] value) { + public ProducerRecord(String topic, Integer partition, K key, V value) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null"); this.topic = topic; @@ -51,7 +51,7 @@ public final class ProducerRecord { * @param key The key that will be included in the record * @param value The record contents */ - public ProducerRecord(String topic, byte[] key, byte[] value) { + public ProducerRecord(String topic, K key, V value) { this(topic, null, key, value); } @@ -61,7 +61,7 @@ public final class ProducerRecord { * @param topic The topic this record should be sent to * @param value The record contents */ - public ProducerRecord(String topic, byte[] value) { + public ProducerRecord(String topic, V value) { this(topic, null, value); } @@ -75,14 +75,14 @@ public final class ProducerRecord { /** * The key (or null if no key is specified) */ - public byte[] key() { + public K key() { return key; } /** * @return The value */ - public byte[] value() { + public V value() { return value; } @@ -95,8 +95,8 @@ public final class ProducerRecord { @Override public String toString() { - String key = this.key == null ? "null" : ("byte[" + this.key.length + "]"); - String value = this.value == null ? "null" : ("byte[" + this.value.length + "]"); + String key = this.key == null ? "null" : this.key.toString(); + String value = this.value == null ? "null" : this.value.toString(); return "ProducerRecord(topic=" + topic + ", partition=" + partition + ", key=" + key + ", value=" + value; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java new file mode 100644 index 0000000..de87f9c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients.producer; + +import org.apache.kafka.common.Configurable; + +/** + * + * @param Type to be serialized from. + * + * A class that implements this interface is expected to have a constructor with no parameter. + */ +public interface Serializer extends Configurable { + /** + * + * @param topic Topic associated with data + * @param data Typed data + * @param isKey Is data for key or value + * @return + */ + public byte[] serialize(String topic, T data, boolean isKey); + + /** + * Close this serializer + */ + public void close(); +} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java index 40e8234..483899d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java @@ -43,7 +43,7 @@ public class Partitioner { * @param record The record being sent * @param cluster The current cluster metadata */ - public int partition(ProducerRecord record, Cluster cluster) { + public int partition(ProducerRecord record, Cluster cluster) { List partitions = cluster.partitionsForTopic(record.topic()); int numPartitions = partitions.size(); if (record.partition() != null) { diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java new file mode 100644 index 0000000..aa0cd02 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.KafkaException; + +/** + * Any exception during deserialization in the producer + */ +public class DeserializationException extends KafkaException { + + private static final long serialVersionUID = 1L; + + public DeserializationException(String message, Throwable cause) { + super(message, cause); + } + + public DeserializationException(String message) { + super(message); + } + + public DeserializationException(Throwable cause) { + super(cause); + } + + public DeserializationException() { + super(); + } + + /* avoid the expensive and useless stack trace for serialization exceptions */ + @Override + public Throwable fillInStackTrace() { + return this; + } + +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java new file mode 100644 index 0000000..00388d1 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.KafkaException; + +/** + * Any exception during serialization in the producer + */ +public class SerializationException extends KafkaException { + + private static final long serialVersionUID = 1L; + + public SerializationException(String message, Throwable cause) { + super(message, cause); + } + + public SerializationException(String message) { + super(message); + } + + public SerializationException(Throwable cause) { + super(cause); + } + + public SerializationException() { + super(); + } + + /* avoid the expensive and useless stack trace for serialization exceptions */ + @Override + public Throwable fillInStackTrace() { + return this; + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala index b020793..8e00713 100644 --- a/core/src/main/scala/kafka/producer/BaseProducer.scala +++ b/core/src/main/scala/kafka/producer/BaseProducer.scala @@ -33,10 +33,10 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer { // decide whether to send synchronously based on producer properties val sync = producerProps.getProperty("producer.type", "async").equals("sync") - val producer = new KafkaProducer(producerProps) + val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) override def send(topic: String, key: Array[Byte], value: Array[Byte]) { - val record = new ProducerRecord(topic, key, value) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, key, value) if(sync) { this.producer.send(record).get() } else { diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index 4b5b823..e194942 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -32,7 +32,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var requiredNumAcks: Int = Int.MaxValue var syncSend: Boolean = false - private var producer: KafkaProducer = null + private var producer: KafkaProducer[Array[Byte],Array[Byte]] = null def getTopic: String = topic def setTopic(topic: String) { this.topic = topic } @@ -60,7 +60,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { throw new MissingConfigException("topic must be specified by the Kafka log4j appender") if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString) - producer = new KafkaProducer(props) + producer = new KafkaProducer[Array[Byte],Array[Byte]](props) LogLog.debug("Kafka producer connected to " + brokerList) LogLog.debug("Logging for topic: " + topic) } @@ -68,7 +68,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { override def append(event: LoggingEvent) { val message = subAppend(event) LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message) - val response = producer.send(new ProducerRecord(topic, message.getBytes())) + val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message.getBytes())) if (syncSend) response.get } diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index f399105..2126f6e 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -39,7 +39,7 @@ object MirrorMaker extends Logging { private var producerThreads: Seq[ProducerThread] = null private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) - private val shutdownMessage : ProducerRecord = new ProducerRecord("shutdown", "shutdown".getBytes) + private val shutdownMessage : ProducerRecord[Array[Byte],Array[Byte]] = new ProducerRecord[Array[Byte],Array[Byte]]("shutdown", "shutdown".getBytes) def main(args: Array[String]) { @@ -187,9 +187,9 @@ object MirrorMaker extends Logging { class DataChannel(capacity: Int, numInputs: Int, numOutputs: Int) extends KafkaMetricsGroup { - val queues = new Array[BlockingQueue[ProducerRecord]](numOutputs) + val queues = new Array[BlockingQueue[ProducerRecord[Array[Byte],Array[Byte]]]](numOutputs) for (i <- 0 until numOutputs) - queues(i) = new ArrayBlockingQueue[ProducerRecord](capacity) + queues(i) = new ArrayBlockingQueue[ProducerRecord[Array[Byte],Array[Byte]]](capacity) private val counter = new AtomicInteger(new Random().nextInt()) @@ -201,7 +201,7 @@ object MirrorMaker extends Logging { private val waitTake = newMeter("MirrorMaker-DataChannel-WaitOnTake", "percent", TimeUnit.NANOSECONDS) private val channelSizeHist = newHistogram("MirrorMaker-DataChannel-Size") - def put(record: ProducerRecord) { + def put(record: ProducerRecord[Array[Byte],Array[Byte]]) { // If the key of the message is empty, use round-robin to select the queue // Otherwise use the queue based on the key value so that same key-ed messages go to the same queue val queueId = @@ -213,7 +213,7 @@ object MirrorMaker extends Logging { put(record, queueId) } - def put(record: ProducerRecord, queueId: Int) { + def put(record: ProducerRecord[Array[Byte],Array[Byte]], queueId: Int) { val queue = queues(queueId) var putSucceed = false @@ -225,9 +225,9 @@ object MirrorMaker extends Logging { channelSizeHist.update(queue.size) } - def take(queueId: Int): ProducerRecord = { + def take(queueId: Int): ProducerRecord[Array[Byte],Array[Byte]] = { val queue = queues(queueId) - var data: ProducerRecord = null + var data: ProducerRecord[Array[Byte],Array[Byte]] = null while (data == null) { val startTakeTime = SystemTime.nanoseconds data = queue.poll(500, TimeUnit.MILLISECONDS) @@ -254,7 +254,7 @@ object MirrorMaker extends Logging { info("Starting mirror maker consumer thread " + threadName) try { for (msgAndMetadata <- stream) { - val data = new ProducerRecord(msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) + val data = new ProducerRecord[Array[Byte],Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) mirrorDataChannel.put(data) } } catch { @@ -297,7 +297,7 @@ object MirrorMaker extends Logging { info("Starting mirror maker producer thread " + threadName) try { while (true) { - val data: ProducerRecord = dataChannel.take(threadId) + val data: ProducerRecord[Array[Byte],Array[Byte]] = dataChannel.take(threadId) trace("Sending message with value size %d".format(data.value().size)) if(data eq shutdownMessage) { info("Received shutdown message") @@ -345,5 +345,4 @@ object MirrorMaker extends Logging { } } } -} - +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 3393a3d..f541987 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -124,7 +124,7 @@ object ReplayLogProducer extends Logging { class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging { val shutdownLatch = new CountDownLatch(1) - val producer = new KafkaProducer(config.producerProps) + val producer = new KafkaProducer[Array[Byte],Array[Byte]](config.producerProps) override def run() { info("Starting consumer thread..") @@ -137,7 +137,7 @@ object ReplayLogProducer extends Logging { stream for (messageAndMetadata <- iter) { try { - val response = producer.send(new ProducerRecord(config.outputTopic, + val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic, messageAndMetadata.key(), messageAndMetadata.message())) if(config.isSync) { response.get() diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala index 67196f3..2ebc7bf 100644 --- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala @@ -56,7 +56,7 @@ object TestEndToEndLatency { producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) - val producer = new KafkaProducer(producerProps) + val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) // make sure the consumer fetcher has started before sending data since otherwise // the consumption from the tail will skip the first message and hence be blocked @@ -67,7 +67,7 @@ object TestEndToEndLatency { val latencies = new Array[Long](numMessages) for (i <- 0 until numMessages) { val begin = System.nanoTime - producer.send(new ProducerRecord(topic, message)) + producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message)) val received = iter.next val elapsed = System.nanoTime - begin // poor man's progress bar diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala index 1d4ea93..b81010e 100644 --- a/core/src/main/scala/kafka/tools/TestLogCleaning.scala +++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala @@ -242,7 +242,7 @@ object TestLogCleaning { val producerProps = new Properties producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) - val producer = new KafkaProducer(producerProps) + val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) val rand = new Random(1) val keyCount = (messages / dups).toInt val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt") @@ -254,9 +254,9 @@ object TestLogCleaning { val delete = i % 100 < percentDeletes val msg = if(delete) - new ProducerRecord(topic, key.toString.getBytes(), null) + new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), null) else - new ProducerRecord(topic, key.toString.getBytes(), i.toString.getBytes()) + new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), i.toString.getBytes()) producer.send(msg) producedWriter.write(TestRecord(topic, key, i, delete).toString) producedWriter.newLine() diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 6379f2b..1505fd4 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -75,7 +75,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000") props.put(ProducerConfig.LINGER_MS_CONFIG, "200") - var producer = new KafkaProducer(props) + var producer = new KafkaProducer[Array[Byte],Array[Byte]](props) val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "") try { @@ -89,7 +89,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK // make sure the returned messages are correct val responses = for (message <- messages) - yield producer.send(new ProducerRecord(topic, null, null, message)) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, null, null, message)) val futures = responses.toList for ((future, offset) <- futures zip (0 until numRecords)) { assertEquals(offset.toLong, future.get.offset) diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 209a409..a890316 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -52,10 +52,10 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null - private var producer1: KafkaProducer = null - private var producer2: KafkaProducer = null - private var producer3: KafkaProducer = null - private var producer4: KafkaProducer = null + private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null + private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null + private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null + private var producer4: KafkaProducer[Array[Byte],Array[Byte]] = null private val topic1 = "topic-1" private val topic2 = "topic-2" @@ -93,7 +93,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // send a too-large record - val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L) } @@ -106,7 +106,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // send a too-large record - val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) intercept[ExecutionException] { producer2.send(record).get } @@ -118,7 +118,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes @Test def testNonExistentTopic() { // send a record with non-exist topic - val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic2, null, "key".getBytes, "value".getBytes) intercept[ExecutionException] { producer1.send(record).get } @@ -143,7 +143,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) // send a record with incorrect broker list - val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) intercept[ExecutionException] { producer4.send(record).get } @@ -160,7 +160,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // first send a message to make sure the metadata is refreshed - val record1 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) producer1.send(record1).get producer2.send(record1).get @@ -180,7 +180,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes val msgSize = producerBufferSize / tooManyRecords val value = new Array[Byte](msgSize) new Random().nextBytes(value) - val record2 = new ProducerRecord(topic1, null, "key".getBytes, value) + val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, value) intercept[KafkaException] { for (i <- 1 to tooManyRecords) @@ -201,7 +201,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // create a record with incorrect partition id, send should fail - val record = new ProducerRecord(topic1, new Integer(1), "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, new Integer(1), "key".getBytes, "value".getBytes) intercept[IllegalArgumentException] { producer1.send(record) } @@ -221,7 +221,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes // create topic TestUtils.createTopic(zkClient, topic1, 1, 2, servers) - val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes, "value".getBytes) // first send a message to make sure the metadata is refreshed producer1.send(record).get @@ -311,8 +311,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps) - - val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes) try { producer3.send(record).get fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas") @@ -333,8 +332,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps) - - val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes, "value".getBytes) // This should work producer3.send(record).get @@ -366,7 +364,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes override def doWork(): Unit = { val responses = for (i <- sent+1 to sent+numRecords) - yield producer.send(new ProducerRecord(topic1, null, null, i.toString.getBytes)) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, i.toString.getBytes)) val futures = responses.toList try { diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index d407af9..6196060 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -86,24 +86,24 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { TestUtils.createTopic(zkClient, topic, 1, 2, servers) // send a normal record - val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes) + val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes) assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset) // send a record with null value should be ok - val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null) + val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, null) assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset) // send a record with null key should be ok - val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes) + val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), null, "value".getBytes) assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset) // send a record with null part id should be ok - val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) + val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset) // send a record with null topic should fail try { - val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes) + val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, new Integer(0), "key".getBytes, "value".getBytes) producer.send(record4, callback) fail("Should not allow sending a record without topic") } catch { @@ -140,7 +140,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { TestUtils.createTopic(zkClient, topic, 1, 2, servers) // non-blocking send a list of records - val record0 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) + val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) for (i <- 1 to numRecords) producer.send(record0) val response0 = producer.send(record0) @@ -182,7 +182,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { val responses = for (i <- 1 to numRecords) - yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes)) + yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) val futures = responses.toList futures.map(_.get) for (future <- futures) @@ -228,7 +228,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { try { // Send a message to auto-create the topic - val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) + val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) assertEquals("Should have offset 0", 0L, producer.send(record).get.offset) // double check that the topic is created with leader elected diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 0da774d..94d0028 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -383,7 +383,7 @@ object TestUtils extends Logging { metadataFetchTimeout: Long = 3000L, blockOnBufferFull: Boolean = true, bufferSize: Long = 1024L * 1024L, - retries: Int = 0) : KafkaProducer = { + retries: Int = 0) : KafkaProducer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.producer.ProducerConfig val producerProps = new Properties() @@ -395,7 +395,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") - return new KafkaProducer(producerProps) + return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) } /** -- 1.8.5.2 (Apple Git-48) From b8c0c4dd4cae8b480a542b7834e96f36151ca14c Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Mon, 15 Dec 2014 16:39:49 -0800 Subject: [PATCH 2/3] fix imports --- .../apache/kafka/clients/consumer/KafkaConsumer.java | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index c683f40..6a45645 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -12,21 +12,6 @@ */ package org.apache.kafka.clients.consumer; -import java.net.InetSocketAddress; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.Map.Entry; -import java.util.concurrent.Future; - -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.clients.producer.Serializer; import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.JmxReporter; @@ -38,6 +23,9 @@ import org.apache.kafka.common.utils.SystemTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.util.*; + /** * A Kafka client that consumes records from a Kafka cluster. *

-- 1.8.5.2 (Apple Git-48) From 57c76625f7bb303b296a1c8239360f10279b2486 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Mon, 15 Dec 2014 15:35:09 -0800 Subject: [PATCH 3/3] address Neha's comments --- .../consumer/ConsumerRebalanceCallback.java | 4 +- .../kafka/clients/consumer/KafkaConsumer.java | 96 +++++++++++++++------- .../kafka/clients/producer/KafkaProducer.java | 48 +++++++++-- .../common/errors/DeserializationException.java | 2 +- 4 files changed, 109 insertions(+), 41 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java index f026ae4..e4cf7d1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java @@ -35,7 +35,7 @@ public interface ConsumerRebalanceCallback { * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} * @param partitions The list of partitions that are assigned to the consumer after rebalance */ - public void onPartitionsAssigned(Consumer consumer, Collection partitions); + public void onPartitionsAssigned(Consumer consumer, Collection partitions); /** * A callback method the user can implement to provide handling of offset commits to a customized store on the @@ -46,5 +46,5 @@ public interface ConsumerRebalanceCallback { * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer} * @param partitions The list of partitions that were assigned to the consumer on the last rebalance */ - public void onPartitionsRevoked(Consumer consumer, Collection partitions); + public void onPartitionsRevoked(Consumer consumer, Collection partitions); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 6a45645..7d441e3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -40,12 +40,12 @@ import java.util.*; * a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method. *

  * {@code
- * private Map process(Map records) {
+ * private Map process(Map records) {
  *     Map processedOffsets = new HashMap();
- *     for(Entry recordMetadata : records.entrySet()) {
- *          List recordsPerTopic = recordMetadata.getValue().records();
+ *     for(Entry> recordMetadata : records.entrySet()) {
+ *          List> recordsPerTopic = recordMetadata.getValue().records();
  *          for(int i = 0;i < recordsPerTopic.size();i++) {
- *               ConsumerRecord record = recordsPerTopic.get(i);
+ *               ConsumerRecord record = recordsPerTopic.get(i);
  *               // process record
  *               try {
  *               	processedOffsets.put(record.topicAndpartition(), record.offset());
@@ -70,11 +70,11 @@ import java.util.*;
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "true");
  * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer consumer = new KafkaConsumer(props);
  * consumer.subscribe("foo", "bar");
  * boolean isRunning = true;
  * while(isRunning) {
- *   Map records = consumer.poll(100);
+ *   Map> records = consumer.poll(100);
  *   process(records);
  * }
  * consumer.close();
@@ -92,14 +92,14 @@ import java.util.*;
  * props.put("group.id", "test");
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer consumer = new KafkaConsumer(props);
  * consumer.subscribe("foo", "bar");
  * int commitInterval = 100;
  * int numRecords = 0;
  * boolean isRunning = true;
  * Map consumedOffsets = new HashMap();
  * while(isRunning) {
- *     Map records = consumer.poll(100);
+ *     Map> records = consumer.poll(100);
  *     try {
  *         Map lastConsumedOffsets = process(records);
  *         consumedOffsets.putAll(lastConsumedOffsets);
@@ -146,16 +146,17 @@ import java.util.*;
  * props.put("group.id", "test");
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(props,
+ * KafkaConsumer consumer = new KafkaConsumer(
+ *                                            props,
  *                                            new ConsumerRebalanceCallback() {
  *                                                boolean rewindOffsets = true;  // should be retrieved from external application config
- *                                                public void onPartitionsAssigned(Consumer consumer, Collection partitions) {
+ *                                                public void onPartitionsAssigned(Consumer consumer, Collection partitions) {
  *                                                    Map latestCommittedOffsets = consumer.committed(partitions);
  *                                                    if(rewindOffsets)
  *                                                        Map newOffsets = rewindOffsets(latestCommittedOffsets, 100);
  *                                                    consumer.seek(newOffsets);
  *                                                }
- *                                                public void onPartitionsRevoked(Consumer consumer, Collection partitions) {
+ *                                                public void onPartitionsRevoked(Consumer consumer, Collection partitions) {
  *                                                    consumer.commit(true);
  *                                                }
  *                                                // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages 
@@ -173,7 +174,7 @@ import java.util.*;
  * boolean isRunning = true;
  * Map consumedOffsets = new HashMap();
  * while(isRunning) {
- *     Map records = consumer.poll(100);
+ *     Map> records = consumer.poll(100);
  *     Map lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     numRecords += records.size();
@@ -201,13 +202,14 @@ import java.util.*;
  * props.put("group.id", "test");
  * props.put("session.timeout.ms", "1000");
  * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
- * KafkaConsumer consumer = new KafkaConsumer(props,
+ * KafkaConsumer consumer = new KafkaConsumer(
+ *                                            props,
  *                                            new ConsumerRebalanceCallback() {
- *                                                public void onPartitionsAssigned(Consumer consumer, Collection partitions) {
+ *                                                public void onPartitionsAssigned(Consumer consumer, Collection partitions) {
  *                                                    Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
  *                                                    consumer.seek(lastCommittedOffsets);
  *                                                }
- *                                                public void onPartitionsRevoked(Consumer consumer, Collection partitions) {
+ *                                                public void onPartitionsRevoked(Consumer consumer, Collection partitions) {
  *                                                    Map offsets = getLastConsumedOffsets(partitions);
  *                                                    commitOffsetsToCustomStore(offsets); 
  *                                                }
@@ -224,7 +226,7 @@ import java.util.*;
  * boolean isRunning = true;
  * Map consumedOffsets = new HashMap();
  * while(isRunning) {
- *     Map records = consumer.poll(100);
+ *     Map> records = consumer.poll(100);
  *     Map lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     numRecords += records.size();
@@ -249,7 +251,7 @@ import java.util.*;
  * props.put("group.id", "test");
  * props.put("enable.auto.commit", "true");
  * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer consumer = new KafkaConsumer(props);
  * // subscribe to some partitions of topic foo
  * TopicPartition partition0 = new TopicPartition("foo", 0);
  * TopicPartition partition1 = new TopicPartition("foo", 1);
@@ -266,7 +268,7 @@ import java.util.*;
  * boolean isRunning = true;
  * Map consumedOffsets = new HashMap();
  * while(isRunning) {
- *     Map records = consumer.poll(100);
+ *     Map> records = consumer.poll(100);
  *     Map lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     for(TopicPartition partition : partitions) {
@@ -288,7 +290,7 @@ import java.util.*;
  * {@code  
  * Properties props = new Properties();
  * props.put("metadata.broker.list", "localhost:9092");
- * KafkaConsumer consumer = new KafkaConsumer(props);
+ * KafkaConsumer consumer = new KafkaConsumer(props);
  * // subscribe to some partitions of topic foo
  * TopicPartition partition0 = new TopicPartition("foo", 0);
  * TopicPartition partition1 = new TopicPartition("foo", 1);
@@ -304,7 +306,7 @@ import java.util.*;
  * boolean isRunning = true;
  * Map consumedOffsets = new HashMap();
  * while(isRunning) {
- *     Map records = consumer.poll(100);
+ *     Map> records = consumer.poll(100);
  *     Map lastConsumedOffsets = process(records);
  *     consumedOffsets.putAll(lastConsumedOffsets);
  *     // commit offsets for partitions 0,1 for topic foo to custom store
@@ -343,7 +345,7 @@ public class KafkaConsumer implements Consumer {
      * @param configs   The consumer configs
      */
     public KafkaConsumer(Map configs) {
-        this(new ConsumerConfig(configs), null);
+        this(new ConsumerConfig(configs), null, null, null);
     }
 
     /**
@@ -356,7 +358,22 @@ public class KafkaConsumer implements Consumer {
      *                  every rebalance operation.  
      */
     public KafkaConsumer(Map configs, ConsumerRebalanceCallback callback) {
-        this(new ConsumerConfig(configs), callback);
+        this(new ConsumerConfig(configs), callback, null, null);
+    }
+
+    /**
+     * A consumer is instantiated by providing a set of key-value pairs as configuration, a {@link ConsumerRebalanceCallback}
+     * implementation, a key and a value {@link Deserializer}.
+     * 

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * @param configs The consumer configs + * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of + * every rebalance operation. + * @param keyDeserializer The deserializer for key that implements {@link Deserializer} + * @param valueDeserializer The deserializer for value that implements {@link Deserializer} + */ + public KafkaConsumer(Map configs, ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) { + this(new ConsumerConfig(configs), callback, keyDeserializer, valueDeserializer); } /** @@ -364,7 +381,7 @@ public class KafkaConsumer implements Consumer { * Valid configuration strings are documented at {@link ConsumerConfig} */ public KafkaConsumer(Properties properties) { - this(new ConsumerConfig(properties), null); + this(new ConsumerConfig(properties), null, null, null); } /** @@ -377,14 +394,25 @@ public class KafkaConsumer implements Consumer { * every rebalance operation. */ public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) { - this(new ConsumerConfig(properties), callback); + this(new ConsumerConfig(properties), callback, null, null); } - private KafkaConsumer(ConsumerConfig config) { - this(config, null); + /** + * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a + * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}. + *

+ * Valid configuration strings are documented at {@link ConsumerConfig} + * @param properties The consumer configuration properties + * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of + * every rebalance operation. + * @param keyDeserializer The deserializer for key that implements {@link Deserializer} + * @param valueDeserializer The deserializer for value that implements {@link Deserializer} + */ + public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) { + this(new ConsumerConfig(properties), callback, keyDeserializer, valueDeserializer); } - private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback) { + private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) { log.trace("Starting the Kafka consumer"); subscribedTopics = new HashSet(); subscribedPartitions = new HashSet(); @@ -395,10 +423,16 @@ public class KafkaConsumer implements Consumer { this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - Deserializer.class); - this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - Deserializer.class); + if (keyDeserializer == null) + this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + Deserializer.class); + else + this.keyDeserializer = keyDeserializer; + if (valueDeserializer == null) + this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + Deserializer.class); + else + this.valueDeserializer = valueDeserializer; config.logUnused(); log.debug("Kafka consumer started"); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3d30d95..7e18b04 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -83,20 +83,47 @@ public class KafkaProducer implements Producer { * are documented here. Values can be * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the * string "42" or the integer 42). + * @param configs The producer configs + * */ public KafkaProducer(Map configs) { - this(new ProducerConfig(configs)); + this(new ProducerConfig(configs), null, null); + } + + /** + * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}. + * Valid configuration strings are documented here. + * Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept + * either the string "42" or the integer 42). + * @param configs The producer configs + * @param keySerializer The serializer for key that implements {@link Serializer} + * @param valueSerializer The serializer for value that implements {@link Serializer} + */ + public KafkaProducer(Map configs, Serializer keySerializer, Serializer valueSerializer) { + this(new ProducerConfig(configs), keySerializer, valueSerializer); } /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings * are documented here. + * @param properties The producer configs */ public KafkaProducer(Properties properties) { - this(new ProducerConfig(properties)); + this(new ProducerConfig(properties), null, null); + } + + /** + * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}. + * Valid configuration strings are documented here. + * @param properties The producer configs + * @param keySerializer The serializer for key that implements {@link Serializer} + * @param valueSerializer The serializer for value that implements {@link Serializer} + */ + public KafkaProducer(Properties properties, Serializer keySerializer, Serializer valueSerializer) { + this(new ProducerConfig(properties), keySerializer, valueSerializer); } - private KafkaProducer(ProducerConfig config) { + private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer) { log.trace("Starting the Kafka producer"); this.time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) @@ -147,10 +174,16 @@ public class KafkaProducer implements Producer { this.errors = this.metrics.sensor("errors"); - this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - Serializer.class); - this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - Serializer.class); + if (keySerializer == null) + this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + Serializer.class); + else + this.keySerializer = keySerializer; + if (valueSerializer == null) + this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + Serializer.class); + else + this.valueSerializer = valueSerializer; config.logUnused(); log.debug("Kafka producer started"); @@ -166,6 +199,7 @@ public class KafkaProducer implements Producer { /** * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)} + * @param record The record to be sent */ @Override public Future send(ProducerRecord record) { diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java index aa0cd02..0eeac94 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java @@ -16,7 +16,7 @@ package org.apache.kafka.common.errors; import org.apache.kafka.common.KafkaException; /** - * Any exception during deserialization in the producer + * Any exception during deserialization in the consumer */ public class DeserializationException extends KafkaException { -- 1.8.5.2 (Apple Git-48)