From 64ca7d090c73aed4164599fe086f60a8079139a1 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Mon, 5 Jan 2015 11:09:50 -0800 Subject: [PATCH] addressing Jay's comments --- .../clients/consumer/ByteArrayDeserializer.java | 34 ----------- .../kafka/clients/consumer/ConsumerConfig.java | 4 +- .../kafka/clients/consumer/Deserializer.java | 38 ------------ .../kafka/clients/consumer/KafkaConsumer.java | 37 ++++++++++-- .../clients/producer/ByteArraySerializer.java | 34 ----------- .../kafka/clients/producer/KafkaProducer.java | 59 +++++++++++++++---- .../kafka/clients/producer/ProducerConfig.java | 4 +- .../apache/kafka/clients/producer/Serializer.java | 38 ------------ .../apache/kafka/common/config/AbstractConfig.java | 13 +++-- .../common/errors/DeserializationException.java | 47 --------------- .../serialization/ByteArrayDeserializer.java | 34 +++++++++++ .../common/serialization/ByteArraySerializer.java | 34 +++++++++++ .../kafka/common/serialization/Deserializer.java | 45 ++++++++++++++ .../kafka/common/serialization/Serializer.java | 45 ++++++++++++++ .../common/serialization/StringDeserializer.java | 44 ++++++++++++++ .../common/serialization/StringSerializer.java | 44 ++++++++++++++ .../common/serialization/SerializationTest.java | 68 ++++++++++++++++++++++ .../scala/kafka/producer/KafkaLog4jAppender.scala | 2 + .../main/scala/kafka/tools/ConsoleProducer.scala | 2 + core/src/main/scala/kafka/tools/MirrorMaker.scala | 7 ++- .../scala/kafka/tools/ProducerPerformance.scala | 2 + .../main/scala/kafka/tools/ReplayLogProducer.scala | 2 + .../scala/kafka/tools/TestEndToEndLatency.scala | 2 + .../main/scala/kafka/tools/TestLogCleaning.scala | 2 + .../kafka/api/ProducerCompressionTest.scala | 2 + .../integration/kafka/api/ProducerSendTest.scala | 53 +++++++++++++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 2 + 27 files changed, 479 insertions(+), 219 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java delete mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java delete mode 100644 clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java delete mode 100644 clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java create mode 100644 clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java create mode 100644 clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java create mode 100644 clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java create mode 100644 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java create mode 100644 clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java create mode 100644 clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java create mode 100644 clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java deleted file mode 100644 index 514cbd2..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.kafka.clients.consumer; - -import java.util.Map; - -public class ByteArrayDeserializer implements Deserializer { - - @Override - public void configure(Map configs) { - // nothing to do - } - - @Override - public byte[] deserialize(String topic, byte[] data, boolean isKey) { - return data; - } - - @Override - public void close() { - // nothing to do - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 1d64f08..57c1807 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -185,8 +185,8 @@ public class ConsumerConfig extends AbstractConfig { METRICS_SAMPLE_WINDOW_MS_DOC) .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) - .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC) - .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.consumer.ByteArrayDeserializer", Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC); + .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC) + .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java deleted file mode 100644 index c774a19..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.kafka.clients.consumer; - -import org.apache.kafka.common.Configurable; - -/** - * - * @param Type to be deserialized into. - * - * A class that implements this interface is expected to have a constructor with no parameter. - */ -public interface Deserializer extends Configurable { - /** - * - * @param topic Topic associated with the data - * @param data Serialized bytes - * @param isKey Is data for key or value - * @return - */ - public T deserialize(String topic, byte[] data, boolean isKey); - - /** - * Close this deserializer - */ - public void close(); -} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index fe90663..7f8a41c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.ClientUtils; import org.apache.kafka.common.utils.SystemTime; import org.slf4j.Logger; @@ -345,7 +346,7 @@ public class KafkaConsumer implements Consumer { * @param configs The consumer configs */ public KafkaConsumer(Map configs) { - this(new ConsumerConfig(configs), null, null, null); + this(configs, null); } /** @@ -358,7 +359,7 @@ public class KafkaConsumer implements Consumer { * every rebalance operation. */ public KafkaConsumer(Map configs, ConsumerRebalanceCallback callback) { - this(new ConsumerConfig(configs), callback, null, null); + this(configs, callback, null, null); } /** @@ -375,7 +376,19 @@ public class KafkaConsumer implements Consumer { * won't be called when the deserializer is passed in directly. */ public KafkaConsumer(Map configs, ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) { - this(new ConsumerConfig(configs), callback, keyDeserializer, valueDeserializer); + this(new ConsumerConfig(addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), + callback, keyDeserializer, valueDeserializer); + } + + private static Map addDeserializerToConfig(Map configs, + Deserializer keyDeserializer, Deserializer valueDeserializer) { + Map newConfigs = new HashMap(); + newConfigs.putAll(configs); + if (keyDeserializer != null) + newConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass()); + if (keyDeserializer != null) + newConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass()); + return newConfigs; } /** @@ -383,7 +396,7 @@ public class KafkaConsumer implements Consumer { * Valid configuration strings are documented at {@link ConsumerConfig} */ public KafkaConsumer(Properties properties) { - this(new ConsumerConfig(properties), null, null, null); + this(properties, null); } /** @@ -396,7 +409,7 @@ public class KafkaConsumer implements Consumer { * every rebalance operation. */ public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) { - this(new ConsumerConfig(properties), callback, null, null); + this(properties, callback, null, null); } /** @@ -413,7 +426,19 @@ public class KafkaConsumer implements Consumer { * won't be called when the deserializer is passed in directly. */ public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) { - this(new ConsumerConfig(properties), callback, keyDeserializer, valueDeserializer); + this(new ConsumerConfig(addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)), + callback, keyDeserializer, valueDeserializer); + } + + private static Properties addDeserializerToConfig(Properties properties, + Deserializer keyDeserializer, Deserializer valueDeserializer) { + Properties newProperties = new Properties(); + newProperties.putAll(properties); + if (keyDeserializer != null) + newProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); + if (keyDeserializer != null) + newProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); + return newProperties; } private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer keyDeserializer, Deserializer valueDeserializer) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java b/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java deleted file mode 100644 index 9005b74..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.kafka.clients.producer; - -import java.util.Map; - -public class ByteArraySerializer implements Serializer { - - @Override - public void configure(Map configs) { - // nothing to do - } - - @Override - public byte[] serialize(String topic, byte[] data, boolean isKey) { - return data; - } - - @Override - public void close() { - // nothing to do - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index d859fc5..db23a12 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -13,10 +13,7 @@ package org.apache.kafka.clients.producer; import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Properties; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -34,6 +31,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; @@ -44,6 +42,7 @@ import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.ClientUtils; import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.SystemTime; @@ -102,7 +101,19 @@ public class KafkaProducer implements Producer { * be called when the serializer is passed in directly. */ public KafkaProducer(Map configs, Serializer keySerializer, Serializer valueSerializer) { - this(new ProducerConfig(configs), keySerializer, valueSerializer); + this(new ProducerConfig(addSerializerToConfig(configs, keySerializer, valueSerializer)), + keySerializer, valueSerializer); + } + + private static Map addSerializerToConfig(Map configs, + Serializer keySerializer, Serializer valueSerializer) { + Map newConfigs = new HashMap(); + newConfigs.putAll(configs); + if (keySerializer != null) + newConfigs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass()); + if (valueSerializer != null) + newConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass()); + return newConfigs; } /** @@ -124,7 +135,19 @@ public class KafkaProducer implements Producer { * be called when the serializer is passed in directly. */ public KafkaProducer(Properties properties, Serializer keySerializer, Serializer valueSerializer) { - this(new ProducerConfig(properties), keySerializer, valueSerializer); + this(new ProducerConfig(addSerializerToConfig(properties, keySerializer, valueSerializer)), + keySerializer, valueSerializer); + } + + private static Properties addSerializerToConfig(Properties properties, + Serializer keySerializer, Serializer valueSerializer) { + Properties newProperties = new Properties(); + newProperties.putAll(properties); + if (keySerializer != null) + newProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass().getName()); + if (valueSerializer != null) + newProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer.getClass().getName()); + return newProperties; } private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serializer valueSerializer) { @@ -178,14 +201,18 @@ public class KafkaProducer implements Producer { this.errors = this.metrics.sensor("errors"); - if (keySerializer == null) + if (keySerializer == null) { this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); + this.keySerializer.configure(config.originals(), true); + } else this.keySerializer = keySerializer; - if (valueSerializer == null) + if (valueSerializer == null) { this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); + this.valueSerializer.configure(config.originals(), false); + } else this.valueSerializer = valueSerializer; @@ -275,8 +302,20 @@ public class KafkaProducer implements Producer { try { // first make sure the metadata for the topic is available waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); - byte[] serializedKey = keySerializer.serialize(record.topic(), record.key(), true); - byte[] serializedValue = valueSerializer.serialize(record.topic(), record.value(), false); + byte[] serializedKey; + try { + serializedKey = keySerializer.serialize(record.topic(), record.key()); + } catch (ClassCastException cce) { + throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + + " to the one specified in key.serializer"); + } + byte[] serializedValue; + try { + serializedValue = valueSerializer.serialize(record.topic(), record.value()); + } catch (ClassCastException cce) { + throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + + " to the one specified in value.serializer"); + } ProducerRecord serializedRecord = new ProducerRecord(record.topic(), record.partition(), serializedKey, serializedValue); int partition = partitioner.partition(serializedRecord, metadata.fetch()); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 9cdc13d..c3d810c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -227,8 +227,8 @@ public class ProducerConfig extends AbstractConfig { atLeast(1), Importance.LOW, MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC) - .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.ByteArraySerializer", Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC); + .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) + .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC); } ProducerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java deleted file mode 100644 index de87f9c..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.kafka.clients.producer; - -import org.apache.kafka.common.Configurable; - -/** - * - * @param Type to be serialized from. - * - * A class that implements this interface is expected to have a constructor with no parameter. - */ -public interface Serializer extends Configurable { - /** - * - * @param topic Topic associated with data - * @param data Typed data - * @param isKey Is data for key or value - * @return - */ - public byte[] serialize(String topic, T data, boolean isKey); - - /** - * Close this serializer - */ - public void close(); -} diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 3d4ab72..c4fa058 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -12,12 +12,7 @@ */ package org.apache.kafka.common.config; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; @@ -97,6 +92,12 @@ public class AbstractConfig { return keys; } + public Map originals() { + Map copy = new HashMap(); + copy.putAll(originals); + return copy; + } + private void logAll() { StringBuilder b = new StringBuilder(); b.append(getClass().getSimpleName()); diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java b/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java deleted file mode 100644 index a543339..0000000 --- a/clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.kafka.common.errors; - -import org.apache.kafka.common.KafkaException; - -/** - * Any exception during deserialization in the consumer - */ -public class DeserializationException extends KafkaException { - - private static final long serialVersionUID = 1L; - - public DeserializationException(String message, Throwable cause) { - super(message, cause); - } - - public DeserializationException(String message) { - super(message); - } - - public DeserializationException(Throwable cause) { - super(cause); - } - - public DeserializationException() { - super(); - } - - /* avoid the expensive and useless stack trace for deserialization exceptions */ - @Override - public Throwable fillInStackTrace() { - return this; - } - -} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java new file mode 100644 index 0000000..d89b3ff --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.serialization; + +import java.util.Map; + +public class ByteArrayDeserializer implements Deserializer { + + @Override + public void configure(Map configs, boolean isKey) { + // nothing to do + } + + @Override + public byte[] deserialize(String topic, byte[] data) { + return data; + } + + @Override + public void close() { + // nothing to do + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java new file mode 100644 index 0000000..beaef94 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.serialization; + +import java.util.Map; + +public class ByteArraySerializer implements Serializer { + + @Override + public void configure(Map configs, boolean isKey) { + // nothing to do + } + + @Override + public byte[] serialize(String topic, byte[] data) { + return data; + } + + @Override + public void close() { + // nothing to do + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java new file mode 100644 index 0000000..267b94a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.serialization; + +import java.util.Map; + +/** + * + * @param Type to be deserialized into. + * + * A class that implements this interface is expected to have a constructor with no parameter. + */ +public interface Deserializer { + + /** + * Configure this class. + * @param configs configs in key/value pairs + * @param isKey whether is for key or value + */ + public void configure(Map configs, boolean isKey); + + /** + * + * @param topic Topic associated with the data + * @param data Serialized bytes + * @return + */ + public T deserialize(String topic, byte[] data); + + /** + * Close this deserializer + */ + public void close(); +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java new file mode 100644 index 0000000..83780c5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.serialization; + +import java.util.Map; + +/** + * + * @param Type to be serialized from. + * + * A class that implements this interface is expected to have a constructor with no parameter. + */ +public interface Serializer { + + /** + * Configure this class. + * @param configs configs in key/value pairs + * @param isKey whether is for key or value + */ + public void configure(Map configs, boolean isKey); + + /** + * + * @param topic Topic associated with data + * @param data Typed data + * @return + */ + public byte[] serialize(String topic, T data); + + /** + * Close this serializer + */ + public void close(); +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java new file mode 100644 index 0000000..5ee2ac7 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.common.errors.SerializationException; + +import java.io.UnsupportedEncodingException; +import java.util.Map; + +public class StringDeserializer implements Deserializer { + private String encoding = "UTF8"; + + @Override + public void configure(Map configs, boolean isKey) { + String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding"; + Object encodingValue = configs.get(propertyName); + if (encodingValue != null && encodingValue instanceof String) + encoding = (String) encodingValue; + } + + @Override + public String deserialize(String topic, byte[] data) { + try { + return new String(data, encoding); + } catch (UnsupportedEncodingException e) { + throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); + } + } + + @Override + public void close() { + // nothing to do + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java new file mode 100644 index 0000000..ede08dc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.common.errors.SerializationException; + +import java.io.UnsupportedEncodingException; +import java.util.Map; + +public class StringSerializer implements Serializer { + private String encoding = "UTF8"; + + @Override + public void configure(Map configs, boolean isKey) { + String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding"; + Object encodingValue = configs.get(propertyName); + if (encodingValue != null && encodingValue instanceof String) + encoding = (String) encodingValue; + } + + @Override + public byte[] serialize(String topic, String data) { + try { + return data.getBytes(encoding); + } catch (UnsupportedEncodingException e) { + throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); + } + } + + @Override + public void close() { + // nothing to do + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java new file mode 100644 index 0000000..d550a31 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class SerializationTest { + + private static class SerDeser { + final Serializer serializer; + final Deserializer deserializer; + + public SerDeser(Serializer serializer, Deserializer deserializer) { + this.serializer = serializer; + this.deserializer = deserializer; + } + } + + @Test + public void testStringSerializer() { + String str = "my string"; + String mytopic = "testTopic"; + List encodings = new ArrayList(); + encodings.add("UTF8"); + encodings.add("UTF-16"); + + for ( String encoding : encodings) { + SerDeser serDeser = getStringSerDeser(encoding); + Serializer serializer = serDeser.serializer; + Deserializer deserializer = serDeser.deserializer; + + assertEquals("Should get the original string after serialization and deserialization with encoding " + encoding, + str, deserializer.deserialize(mytopic, serializer.serialize(mytopic, str))); + } + + } + + private SerDeser getStringSerDeser(String encoder) { + Map serializerConfigs = new HashMap(); + serializerConfigs.put("key.serializer.encoding", encoder); + Serializer serializer = new StringSerializer(); + serializer.configure(serializerConfigs, true); + + Map deserializerConfigs = new HashMap(); + deserializerConfigs.put("key.deserializer.encoding", encoder); + Deserializer deserializer = new StringDeserializer(); + deserializer.configure(deserializerConfigs, true); + + return new SerDeser(serializer, deserializer); + } +} diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index e194942..652dfb8 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -60,6 +60,8 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { throw new MissingConfigException("topic must be specified by the Kafka log4j appender") if(compressionType != null) props.put(org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) if(requiredNumAcks != Int.MaxValue) props.put(org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG, requiredNumAcks.toString) + props.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + props.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producer = new KafkaProducer[Array[Byte],Array[Byte]](props) LogLog.debug("Kafka producer connected to " + brokerList) LogLog.debug("Logging for topic: " + topic) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 397d80d..a680b62 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -59,6 +59,8 @@ object ConsoleProducer { props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.maxMemoryBytes.toString) props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.maxPartitionMemoryBytes.toString) props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") new NewShinyProducer(props) } else { diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 2126f6e..cd6ccb5 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -22,7 +22,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.producer.{BaseProducer, NewShinyProducer, OldProducer} import kafka.serializer._ import kafka.utils._ -import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import java.util.Random import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} @@ -133,8 +133,11 @@ object MirrorMaker extends Logging { producerThreads = (0 until numProducers).map(i => { producerProps.setProperty("client.id", clientId + "-" + i) val producer = - if (useNewProducer) + if (useNewProducer) { + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") new NewShinyProducer(producerProps) + } else new OldProducer(producerProps) new ProducerThread(mirrorDataChannel, producer, i) diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index f2dc4ed..bc25cd2 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -188,6 +188,8 @@ object ProducerPerformance extends Logging { props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString) props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") new NewShinyProducer(props) } else { props.put("metadata.broker.list", config.brokerList) diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index f541987..2b8537b 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -120,6 +120,8 @@ object ReplayLogProducer extends Logging { import scala.collection.JavaConversions._ val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt)) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") } class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging { diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala index 2ebc7bf..48cff20 100644 --- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala +++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala @@ -56,6 +56,8 @@ object TestEndToEndLatency { producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) // make sure the consumer fetcher has started before sending data since otherwise diff --git a/core/src/main/scala/kafka/tools/TestLogCleaning.scala b/core/src/main/scala/kafka/tools/TestLogCleaning.scala index b81010e..af496f7 100644 --- a/core/src/main/scala/kafka/tools/TestLogCleaning.scala +++ b/core/src/main/scala/kafka/tools/TestLogCleaning.scala @@ -242,6 +242,8 @@ object TestLogCleaning { val producerProps = new Properties producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true") producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps) val rand = new Random(1) val keyCount = (messages / dups).toInt diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index 1505fd4..e635588 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -75,6 +75,8 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression) props.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000") props.put(ProducerConfig.LINGER_MS_CONFIG, "200") + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") var producer = new KafkaProducer[Array[Byte],Array[Byte]](props) val consumer = new SimpleConsumer("localhost", port, 100, 1024*1024, "") diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index 6196060..b15237b 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -30,6 +30,10 @@ import kafka.consumer.SimpleConsumer import kafka.api.FetchRequestBuilder import kafka.message.Message import kafka.integration.KafkaServerTestHarness +import org.apache.kafka.common.errors.SerializationException +import java.util.Properties +import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.serialization.ByteArraySerializer class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { @@ -126,6 +130,55 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } } + @Test + def testSerializer() { + // send a record with a wrong type should receive a serialization exception + try { + val producer = createNewProducerWithWrongSerializer(brokerList) + val record5 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes) + producer.send(record5) + fail("Should have gotten a SerializationException") + } catch { + case se: SerializationException => // this is ok + } + + try { + createNewProducerWithNoSerializer(brokerList) + fail("Instantiating a producer without specifying a serializer should cause a ConfigException") + } catch { + case ce : ConfigException => // this is ok + } + + // create a producer with explicit serializers should succeed + createNewProducerWithExplicitSerializer(brokerList) + } + + private def createNewProducerWithWrongSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = { + import org.apache.kafka.clients.producer.ProducerConfig + + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + } + + private def createNewProducerWithNoSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = { + import org.apache.kafka.clients.producer.ProducerConfig + + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + } + + private def createNewProducerWithExplicitSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = { + import org.apache.kafka.clients.producer.ProducerConfig + + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer, new ByteArraySerializer) + } + /** * testClose checks the closing behavior * diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 94d0028..c9e8ba2 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -395,6 +395,8 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) } -- 1.8.5.2 (Apple Git-48)