From 22aad4494d19a80efa2eb80810d30fd9aae0420d Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Wed, 15 Apr 2015 11:40:46 -0700 Subject: [PATCH] KAFKA-2126: Configure automatically instantiated deserializers in new consumer. --- .../org/apache/kafka/clients/consumer/KafkaConsumer.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 2124334..4132f36 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -486,6 +486,16 @@ public class KafkaConsumer implements Consumer { metricGrpPrefix, metricsTags, this.time); + + if (keyDeserializer == null) { + keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); + keyDeserializer.configure(config.originals(), true); + } + if (valueDeserializer == null) { + valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class); + valueDeserializer.configure(config.originals(), false); + } + this.fetcher = new Fetcher(this.client, this.retryBackoffMs, config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), @@ -493,8 +503,8 @@ public class KafkaConsumer implements Consumer { config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(), - keyDeserializer == null ? config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class) : keyDeserializer, - valueDeserializer == null ? config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class) : valueDeserializer, + keyDeserializer, + valueDeserializer, this.metadata, this.subscriptions, metrics, -- 2.3.3