diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 2d1aa79bc..b82f982b8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -45,14 +45,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.ThreadMetadata; -import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; -import org.apache.kafka.streams.processor.internals.GlobalStreamThread; -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.apache.kafka.streams.processor.internals.ProcessorTopology; -import org.apache.kafka.streams.processor.internals.StateDirectory; -import org.apache.kafka.streams.processor.internals.StreamThread; -import org.apache.kafka.streams.processor.internals.StreamsMetadataState; -import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator; +import org.apache.kafka.streams.processor.internals.*; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.StreamsMetadata; @@ -670,6 +663,14 @@ public class KafkaStreams implements AutoCloseable { // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception final ProcessorTopology taskTopology = internalTopologyBuilder.build(); + for (SinkNode sn : taskTopology.sinks()) { + sn.getKeySer().configure(config.originals(), true); + sn.getValueSer().configure(config.originals(), false); + } + for (SourceNode sn : taskTopology.sources()) { + sn.getKeyDeSer().configure(config.originals(), true); + sn.getValueDeSer().configure(config.originals(), false); + } streamsMetadataState = new StreamsMetadataState( internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); @@ -683,6 +684,15 @@ public class KafkaStreams implements AutoCloseable { log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes."); } final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + for (SinkNode sn : globalTaskTopology.sinks()) { + sn.getKeySer().configure(config.originals(), true); + sn.getValueSer().configure(config.originals(), false); + } + for (SourceNode sn : globalTaskTopology.sources()) { + sn.getKeyDeSer().configure(config.originals(), true); + sn.getValueDeSer().configure(config.originals(), false); + } + final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1)); final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() || (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 73bffc80e..225b9965f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -44,6 +44,12 @@ public class SinkNode extends ProcessorNode { this.partitioner = partitioner; } + public Serializer getKeySer() { + return keySerializer; + } + public Serializer getValueSer() { + return valSerializer; + } /** * @throws UnsupportedOperationException if this method adds a child to a sink node */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 87505cab2..5824d900b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -52,6 +52,13 @@ public class SourceNode extends ProcessorNode { this(name, topics, null, keyDeserializer, valDeserializer); } + public Deserializer getKeyDeSer() { + return keyDeserializer; + } + public Deserializer getValueDeSer() { + return valDeserializer; + } + K deserializeKey(final String topic, final Headers headers, final byte[] data) { return keyDeserializer.deserialize(topic, headers, data); }