diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 0ddcbc5bd..4e97aea01 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3548,6 +3548,7 @@ val clicksPerRegion: KTable[String, Long] =
clicksPerRegion.toStream.to(outputTopic)
A complete example of user-defined SerDes can be found in a test class within the library.
+ The auto-configure of user-defined SerDes would be done at KafkaStreams construction time.
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 4f15deadb..8dfe6ec32 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -49,6 +49,8 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.SinkNode;
+import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
@@ -634,6 +636,25 @@ public class KafkaStreams implements AutoCloseable {
this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
}
+ @SuppressWarnings("unchecked")
+ private void configureSerDe(final Set sinks, final Set sources) {
+ for (final SinkNode sn : sinks) {
+ if (sn.getKeySer() != null) {
+ sn.getKeySer().configure(config.originals(), true);
+ }
+ if (sn.getValueSer() != null) {
+ sn.getValueSer().configure(config.originals(), false);
+ }
+ }
+ for (final SourceNode sn : sources) {
+ if (sn.getKeyDeSer() != null) {
+ sn.getKeyDeSer().configure(config.originals(), true);
+ }
+ if (sn.getValueDeSer() != null) {
+ sn.getValueDeSer().configure(config.originals(), false);
+ }
+ }
+ }
private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
@@ -670,6 +691,7 @@ public class KafkaStreams implements AutoCloseable {
// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
final ProcessorTopology taskTopology = internalTopologyBuilder.build();
+ configureSerDe(taskTopology.sinks(), taskTopology.sources());
streamsMetadataState = new StreamsMetadataState(
internalTopologyBuilder,
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
@@ -683,6 +705,7 @@ public class KafkaStreams implements AutoCloseable {
log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
}
final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+
final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
(globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
@@ -695,7 +718,12 @@ public class KafkaStreams implements AutoCloseable {
final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
GlobalStreamThread.State globalThreadState = null;
+ for (StateStore ss : internalTopologyBuilder.globalStateStores().values()) {
+ int i = 0;
+ }
if (globalTaskTopology != null) {
+ configureSerDe(globalTaskTopology.sinks(), globalTaskTopology.sources());
+
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
config,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index df53ee2f2..3e3d9a5b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
/**
@@ -61,7 +62,7 @@ public interface StateStore {
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
- void init(ProcessorContext context, StateStore root);
+ void init(ProcessorContext context, StateStore root, StreamsConfig config);
/**
* Flush any cached data
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index c30646884..363c3c123 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -42,6 +43,7 @@ abstract class AbstractStateManager implements StateManager {
final File baseDir;
final boolean eosEnabled;
+ StreamsConfig config;
OffsetCheckpoint checkpoint;
final Map checkpointableOffsets = new HashMap<>();
@@ -110,7 +112,7 @@ abstract class AbstractStateManager implements StateManager {
throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException);
}
- stateStore.init(processorContext, stateStore);
+ stateStore.init(processorContext, stateStore, config);
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 33878017b..165720580 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -52,6 +52,7 @@ public abstract class AbstractTask implements Task {
final Logger log;
final LogContext logContext;
final StateDirectory stateDirectory;
+ final StreamsConfig config;
boolean taskInitialized;
boolean taskClosed;
@@ -70,6 +71,7 @@ public abstract class AbstractTask implements Task {
final boolean isStandby,
final StateDirectory stateDirectory,
final StreamsConfig config) {
+ this.config = config;
this.id = id;
this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
this.partitions = new HashSet<>(partitions);
@@ -231,7 +233,7 @@ public abstract class AbstractTask implements Task {
for (final StateStore store : topology.stateStores()) {
log.trace("Initializing store {}", store.name());
processorContext.uninitialize();
- store.init(processorContext, store);
+ store.init(processorContext, store, config);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 768281428..d20da8eb0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -80,6 +80,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
}
}
+ this.config = config;
this.log = logContext.logger(GlobalStateManagerImpl.class);
this.topology = topology;
this.globalConsumer = globalConsumer;
@@ -119,7 +120,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
final List stateStores = topology.globalStateStores();
for (final StateStore stateStore : stateStores) {
globalStoreNames.add(stateStore.name());
- stateStore.init(processorContext, stateStore);
+ stateStore.init(processorContext, stateStore, config);
}
return Collections.unmodifiableSet(globalStoreNames);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 5f32a3b7a..8cae6682e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -237,7 +237,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
+ final StateStore root,
+ final StreamsConfig config) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@@ -436,7 +437,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
+ final StateStore root,
+ final StreamsConfig config) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 73bffc80e..703f11618 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -44,6 +44,14 @@ public class SinkNode extends ProcessorNode {
this.partitioner = partitioner;
}
+ public Serializer getKeySer() {
+ return keySerializer;
+ }
+
+ public Serializer getValueSer() {
+ return valSerializer;
+ }
+
/**
* @throws UnsupportedOperationException if this method adds a child to a sink node
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 87505cab2..c17921411 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -52,6 +52,14 @@ public class SourceNode extends ProcessorNode {
this(name, topics, null, keyDeserializer, valDeserializer);
}
+ public Deserializer getKeyDeSer() {
+ return keyDeserializer;
+ }
+
+ public Deserializer getValueDeSer() {
+ return valDeserializer;
+ }
+
K deserializeKey(final String topic, final Headers headers, final byte[] data) {
return keyDeserializer.deserialize(topic, headers, data);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index bb347de9d..a08fc3ebb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -49,9 +50,10 @@ class CachingKeyValueStore
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
+ final StateStore root,
+ final StreamsConfig config) {
initInternal(context);
- super.init(context, root);
+ super.init(context, root, config);
// save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread.
streamThread = Thread.currentThread();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index edea7e0d4..149b7587e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -47,9 +48,9 @@ class CachingSessionStore
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) {
initInternal((InternalProcessorContext) context);
- super.init(context, root);
+ super.init(context, root, config);
}
@SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 0edd8f265..e5f730026 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -54,9 +55,10 @@ class CachingWindowStore
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context, final StateStore root,
+ final StreamsConfig config) {
initInternal((InternalProcessorContext) context);
- super.init(context, root);
+ super.init(context, root, config);
}
@SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index a924af600..59018946a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -40,8 +41,9 @@ public class ChangeLoggingKeyValueBytesStore
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
- super.init(context, root);
+ final StateStore root,
+ final StreamsConfig config) {
+ super.init(context, root, config);
final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
changeLogger = new StoreChangeLogger<>(
name(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 361f8a53b..bcab8a4d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -41,8 +42,8 @@ class ChangeLoggingSessionBytesStore
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {
- super.init(context, root);
+ public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) {
+ super.init(context, root, config);
final String topic = ProcessorStateManager.storeChangelogTopic(
context.applicationId(),
name());
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index c58e9f09e..df072f817 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -49,9 +50,10 @@ class ChangeLoggingWindowBytesStore
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
+ final StateStore root,
+ final StreamsConfig config) {
this.context = context;
- super.init(context, root);
+ super.init(context, root, config);
final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name());
changeLogger = new StoreChangeLogger<>(
name(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index b37c39e3b..918925f10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -47,7 +48,8 @@ public class InMemoryKeyValueStore implements KeyValueStore {
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
+ final StateStore root,
+ final StreamsConfig config) {
if (root != null) {
// register the store
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index 65f53880b..a24d44630 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -182,7 +183,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) {
final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;
bufferSizeSensor = Sensors.createBufferSizeSensor(this, internalProcessorContext);
bufferCountSensor = Sensors.createBufferCountSensor(this, internalProcessorContext);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 7d1b279fb..fd4d264d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -90,7 +91,7 @@ public class InMemoryWindowStore implements WindowStore {
@Override
@SuppressWarnings("unchecked")
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) {
this.context = (InternalProcessorContext) context;
final StreamsMetricsImpl metrics = this.context.metrics();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index d69df13f4..0f4882561 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -74,7 +75,7 @@ public class MemoryLRUCache implements KeyValueStore {
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) {
// register the store
context.register(root, (key, value) -> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 51da3ed1b..23918e1ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -80,7 +81,8 @@ public class MeteredKeyValueStore
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
+ final StateStore root,
+ final StreamsConfig config) {
metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString();
@@ -104,12 +106,12 @@ public class MeteredKeyValueStore
if (restoreTime.shouldRecord()) {
measureLatency(
() -> {
- super.init(context, root);
+ super.init(context, root, config);
return null;
},
restoreTime);
} else {
- super.init(context, root);
+ super.init(context, root, config);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 4631601b1..0fde94189 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -67,8 +68,15 @@ public class MeteredSessionStore
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
+ final StateStore root,
+ final StreamsConfig config) {
//noinspection unchecked
+ if (keySerde != null) {
+ keySerde.configure(config.originals(), true);
+ }
+ if (valueSerde != null) {
+ valueSerde.configure(config.originals(), false);
+ }
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde) context.keySerde() : keySerde,
@@ -89,7 +97,7 @@ public class MeteredSessionStore
// register and possibly restore the state from the logs
final long startNs = time.nanoseconds();
try {
- super.init(context, root);
+ super.init(context, root, config);
} finally {
metrics.recordLatency(
restoreTime,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
index 1c104915c..b92fbdc68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.StateSerdes;
@@ -49,7 +50,7 @@ class MeteredTimestampedWindowStore
@SuppressWarnings("unchecked")
@Override
- void initStoreSerde(final ProcessorContext context) {
+ void initStoreSerde(final ProcessorContext context, final StreamsConfig config) {
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde) context.keySerde() : keySerde,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 6d2eaab3d..184b97ec4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -69,9 +70,10 @@ public class MeteredWindowStore
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
+ final StateStore root,
+ final StreamsConfig config) {
this.context = context;
- initStoreSerde(context);
+ initStoreSerde(context, config);
metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString();
@@ -87,7 +89,7 @@ public class MeteredWindowStore
// register and possibly restore the state from the logs
final long startNs = time.nanoseconds();
try {
- super.init(context, root);
+ super.init(context, root, config);
} finally {
metrics.recordLatency(
restoreTime,
@@ -98,7 +100,13 @@ public class MeteredWindowStore
}
@SuppressWarnings("unchecked")
- void initStoreSerde(final ProcessorContext context) {
+ void initStoreSerde(final ProcessorContext context, final StreamsConfig config) {
+ if (keySerde != null) {
+ keySerde.configure(config.originals(), true);
+ }
+ if (valueSerde != null) {
+ valueSerde.configure(config.originals(), false);
+ }
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde) context.keySerde() : keySerde,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index f733c80e6..a5192c671 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -148,7 +149,7 @@ public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) {
this.context = (InternalProcessorContext) context;
final StreamsMetricsImpl metrics = this.context.metrics();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 2ca3ad390..3c5ecd873 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -187,7 +187,8 @@ public class RocksDBStore implements KeyValueStore {
}
public void init(final ProcessorContext context,
- final StateStore root) {
+ final StateStore root,
+ final StreamsConfig config) {
// open the DB dir
internalProcessorContext = context;
openDB(context);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 3b634ebea..614effceb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -43,9 +44,10 @@ public class RocksDBWindowStore
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context, final StateStore root,
+ final StreamsConfig config) {
this.context = context;
- super.init(context, root);
+ super.init(context, root, config);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index 62dc8e059..9801a4138 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.TimestampedBytesStore;
/**
@@ -44,8 +45,9 @@ public abstract class WrappedStateStore implements S
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
- wrapped.init(context, root);
+ final StateStore root,
+ final StreamsConfig config) {
+ wrapped.init(context, root, config);
}
@SuppressWarnings("unchecked")
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 6b8b5b529..ab50740ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -476,7 +476,63 @@ public class KafkaStreamsTest {
globalStreams.metadataForKey("store", "key", (topic, key, value, numPartitions) -> 0);
}
+ static class MyStringSerializer extends StringSerializer {
+ boolean configured = false;
+ boolean called = false;
+
+ @Override
+ public void configure(final Map configs, final boolean isKey) {
+ super.configure(configs, isKey);
+ configured = true;
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ called = true;
+ return super.serialize(topic, data);
+ }
+
+ boolean configured() {
+ return !called || configured;
+ }
+ }
+ static class MyStringDeserializer extends StringDeserializer {
+ boolean configured = false;
+ boolean called = false;
+
+ @Override
+ public void configure(final Map configs, final boolean isKey) {
+ super.configure(configs, isKey);
+ configured = true;
+ }
+ @Override
+ public String deserialize(final String topic, final byte[] data) {
+ called = true;
+ return super.deserialize(topic, data);
+ }
+
+ boolean configured() {
+ return !called || configured;
+ }
+ }
+
+ public static class MyStringSerde extends Serdes.WrapperSerde {
+ public MyStringSerde() {
+ super(new MyStringSerializer(), new MyStringDeserializer());
+ }
+
+ @Override
+ public void configure(final Map configs, final boolean isKey) {
+ super.configure(configs, isKey);
+ }
+ public boolean configured() {
+ if (!((MyStringSerializer) this.serializer()).configured()) return false;
+ if (!((MyStringDeserializer) this.deserializer()).configured()) return false;
+ return true;
+ }
+ }
@Test
+ @SuppressWarnings("unchecked")
public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
final AtomicBoolean keepRunning = new AtomicBoolean(true);
KafkaStreams streams = null;
@@ -486,7 +542,9 @@ public class KafkaStreamsTest {
final String topic = "input";
CLUSTER.createTopics(topic);
- builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ final MyStringSerde keyTestSerde = new MyStringSerde();
+ final MyStringSerde valueTestSerde = new MyStringSerde();
+ builder.stream(topic, Consumed.with(keyTestSerde, valueTestSerde))
.foreach((key, value) -> {
try {
latch.countDown();
@@ -510,6 +568,8 @@ public class KafkaStreamsTest {
assertTrue("Timed out waiting to receive single message", latch.await(30, TimeUnit.SECONDS));
assertFalse(streams.close(Duration.ofMillis(10)));
+ assertTrue(keyTestSerde.configured());
+ assertTrue(valueTestSerde.configured());
} finally {
// stop the thread so we don't interfere with other tests etc
keepRunning.set(false);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 7493b0691..0a2317d7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.streams.integration;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreamsTest.MyStringSerde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@@ -39,6 +41,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
@@ -51,9 +54,11 @@ import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import static java.time.Duration.ofMillis;
+import static org.junit.Assert.assertTrue;
/**
* Similar to KStreamAggregationIntegrationTest but with dedupping enabled
@@ -111,17 +116,22 @@ public class KStreamAggregationDedupIntegrationTest {
@Test
+ @SuppressWarnings("unchecked")
public void shouldReduce() throws Exception {
produceMessages(System.currentTimeMillis());
+ final MyStringSerde keyTestSerde = new MyStringSerde();
+ final MyStringSerde valueTestSerde = new MyStringSerde();
groupedStream
.reduce(reducer, Materialized.as("reduce-by-key"))
.toStream()
- .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+ .to(outputTopic, Produced.with(keyTestSerde, valueTestSerde));
startStreams();
produceMessages(System.currentTimeMillis());
+ assertTrue(keyTestSerde.configured());
+ assertTrue(valueTestSerde.configured());
validateReceivedMessages(
new StringDeserializer(),
new StringDeserializer(),
@@ -170,16 +180,79 @@ public class KStreamAggregationDedupIntegrationTest {
);
}
+ static class MyIntSerializer extends IntegerSerializer {
+ boolean configured = false;
+ boolean called = false;
+
+ MyIntSerializer() {
+ super();
+ }
+
+ @Override
+ public void configure(final Map configs, final boolean isKey) {
+ super.configure(configs, isKey);
+ configured = true;
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Integer data) {
+ called = true;
+ return super.serialize(topic, data);
+ }
+
+ boolean configured() {
+ return !called || configured;
+ }
+ }
+ static class MyIntDeserializer extends IntegerDeserializer {
+ boolean configured = false;
+ boolean called = false;
+
+ @Override
+ public void configure(final Map configs, final boolean isKey) {
+ super.configure(configs, isKey);
+ configured = true;
+ }
+ @Override
+ public Integer deserialize(final String topic, final byte[] data) {
+ called = true;
+ return super.deserialize(topic, data);
+ }
+
+ boolean configured() {
+ return !called || configured;
+ }
+ }
+
+ class MyIntegerSerde extends Serdes.WrapperSerde {
+ public MyIntegerSerde() {
+ super(new MyIntSerializer(), new MyIntDeserializer());
+ }
+ @Override
+ public void configure(final Map configs, final boolean isKey) {
+ super.configure(configs, isKey);
+ }
+
+ public boolean configured() {
+ if (!((MyIntSerializer) this.serializer()).configured()) return false;
+ if (!((MyIntDeserializer) this.deserializer()).configured()) return false;
+ return true;
+ }
+ }
@Test
+ @SuppressWarnings("unchecked")
public void shouldGroupByKey() throws Exception {
final long timestamp = mockTime.milliseconds();
produceMessages(timestamp);
produceMessages(timestamp);
- stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
+ final MyIntegerSerde keyTestSerde = new MyIntegerSerde();
+ final MyStringSerde valueTestSerde = new MyStringSerde();
+ stream.groupByKey(Grouped.with(keyTestSerde, valueTestSerde))
.windowedBy(TimeWindows.of(ofMillis(500L)))
.count(Materialized.as("count-windows"))
- .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start())
+ .toStream((windowedKey, value) -> ((Windowed)windowedKey).key() + "@" +
+ ((Windowed)windowedKey).window().start())
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
startStreams();
@@ -197,6 +270,8 @@ public class KStreamAggregationDedupIntegrationTest {
KeyValue.pair("5@" + window, 2L)
)
);
+ assertTrue(keyTestSerde.configured());
+ assertTrue(valueTestSerde.configured());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 4038bfd68..cf3fc9eb2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -83,18 +83,19 @@ public class KStreamSessionWindowAggregateProcessorTest {
private SessionStore sessionStore;
private InternalMockProcessorContext context;
private Metrics metrics;
+ private StreamsConfig config;
@Before
public void initializeStore() {
final File stateDir = TestUtils.tempDirectory();
metrics = new Metrics();
+ config = new StreamsConfig(StreamsTestUtils.getStreamsConfig());
final MockStreamsMetrics metrics = new MockStreamsMetrics(KStreamSessionWindowAggregateProcessorTest.this.metrics);
context = new InternalMockProcessorContext(
stateDir,
Serdes.String(),
Serdes.String(),
- metrics,
- new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+ metrics, config,
NoOpRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 100000, metrics)
) {
@@ -104,11 +105,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
};
- initStore(true);
+ initStore(true, config);
processor.init(context);
}
- private void initStore(final boolean enableCaching) {
+ private void initStore(final boolean enableCaching, StreamsConfig config) {
final StoreBuilder> storeBuilder =
Stores.sessionStoreBuilder(
Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3)),
@@ -121,7 +122,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
sessionStore = storeBuilder.build();
- sessionStore.init(context, sessionStore);
+ sessionStore.init(context, sessionStore, config);
}
@After
@@ -273,7 +274,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
@Test
public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() {
- initStore(false);
+ initStore(false, config);
processor.init(context);
context.setTime(0);
@@ -293,7 +294,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
@Test
public void shouldImmediatelyForwardRemovedSessionsWhenMerging() {
- initStore(false);
+ initStore(false, config);
processor.init(context);
context.setTime(0);
@@ -313,7 +314,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
@Test
public void shouldLogAndMeterWhenSkippingNullKey() {
- initStore(false);
+ initStore(false, config);
processor.init(context);
context.setRecordContext(new ProcessorRecordContext(-1, -2, -3, "topic", null));
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
@@ -340,7 +341,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
sessionMerger
).get();
- initStore(false);
+ initStore(false, config);
processor.init(context);
// dummy record to advance stream time
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 228cfc8d9..598cae4ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -151,7 +151,7 @@ public class KTableSuppressProcessorMetricsTest {
final MockInternalProcessorContext context = new MockInternalProcessorContext();
context.setCurrentNode(new ProcessorNode("testNode"));
- buffer.init(context, buffer);
+ buffer.init(context, buffer, null);
processor.init(context);
final long timestamp = 100L;
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 354fdd5e9..eebfb7273 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -87,7 +87,7 @@ public class KTableSuppressProcessorTest {
final MockInternalProcessorContext context = new MockInternalProcessorContext();
context.setCurrentNode(new ProcessorNode("testNode"));
- buffer.init(context, buffer);
+ buffer.init(context, buffer, null);
processor.init(context);
this.processor = processor;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index b36557cda..a7e225dfc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -152,7 +152,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForKeyValueStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
try {
- store.init(null, null);
+ store.init(null, null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -161,7 +161,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForTimestampedKeyValueStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
try {
- store.init(null, null);
+ store.init(null, null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -170,7 +170,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForWindowStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
try {
- store.init(null, null);
+ store.init(null, null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -179,7 +179,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForTimestampedWindowStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
try {
- store.init(null, null);
+ store.init(null, null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
@@ -188,7 +188,7 @@ public class GlobalProcessorContextImplTest {
public void shouldNotAllowInitForSessionStore() {
final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
try {
- store.init(null, null);
+ store.init(null, null, null);
fail("Should have thrown UnsupportedOperationException.");
} catch (final UnsupportedOperationException expected) { }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index fe4d94880..5f9b076b6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -547,7 +547,7 @@ public class ProcessorContextImplTest {
assertTrue(store.persistent());
assertTrue(store.isOpen());
- checkThrowsUnsupportedOperation(() -> store.init(null, null), "init()");
+ checkThrowsUnsupportedOperation(() -> store.init(null, null, null), "init()");
checkThrowsUnsupportedOperation(store::close, "close()");
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
index 34d9050c7..55dbdf986 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state;
import java.time.Instant;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
@@ -59,7 +60,7 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore {
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) {
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 71e15d46d..0e8fa03e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -75,7 +75,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
context = new InternalMockProcessorContext(null, null, null, null, cache);
topic = "topic";
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic, null));
- store.init(context, null);
+ store.init(context, null, null);
}
@After
@@ -93,7 +93,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
.withCachingEnabled();
final KeyValueStore store = (KeyValueStore) storeBuilder.build();
- store.init(context, store);
+ store.init(context, store, null);
return store;
}
@@ -126,7 +126,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
cache = EasyMock.niceMock(ThreadCache.class);
context = new InternalMockProcessorContext(null, null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic, null));
- store.init(context, null);
+ store.init(context, null, null);
cache.flush("0_0-store");
EasyMock.expectLastCall().andThrow(new NullPointerException("Simulating an error on flush"));
EasyMock.replay(cache);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index 66b27f0a2..8b9515cb3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -81,7 +81,7 @@ public class CachingSessionStoreTest {
cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic", null));
- cachingStore.init(context, cachingStore);
+ cachingStore.init(context, cachingStore, null);
}
@After
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index da49dded4..b8bc016c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -99,7 +99,7 @@ public class CachingWindowStoreTest {
topic = "topic";
context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic, null));
- cachingStore.init(context, cachingStore);
+ cachingStore.init(context, cachingStore, null);
}
@After
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 11f7c4c76..da2dbf14e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -72,7 +72,7 @@ public class ChangeLoggingKeyValueBytesStoreTest {
collector,
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
context.setTime(0);
- store.init(context, store);
+ store.init(context, store, null);
}
@After
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 94eee76f6..cc0b18962 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -77,11 +77,11 @@ public class ChangeLoggingSessionBytesStoreTest {
private void init() {
EasyMock.expect(context.taskId()).andReturn(taskId);
EasyMock.expect(context.recordCollector()).andReturn(collector);
- inner.init(context, store);
+ inner.init(context, store, null);
EasyMock.expectLastCall();
EasyMock.replay(inner, context);
- store.init(context, store);
+ store.init(context, store, null);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
index 2c19c48aa..97530bdc7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java
@@ -77,7 +77,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest {
collector,
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
context.setTime(0);
- store.init(context, store);
+ store.init(context, store, null);
}
@After
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index edf210eae..a1f09d6b3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -78,11 +78,11 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
private void init() {
EasyMock.expect(context.taskId()).andReturn(taskId);
EasyMock.expect(context.recordCollector()).andReturn(collector);
- inner.init(context, store);
+ inner.init(context, store, null);
EasyMock.expectLastCall();
EasyMock.replay(inner, context);
- store.init(context, store);
+ store.init(context, store, null);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index d7ad6d22d..99a5b67b5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -75,11 +75,11 @@ public class ChangeLoggingWindowBytesStoreTest {
private void init() {
EasyMock.expect(context.taskId()).andReturn(taskId);
EasyMock.expect(context.recordCollector()).andReturn(collector);
- inner.init(context, store);
+ inner.init(context, store, null);
EasyMock.expectLastCall();
EasyMock.replay(inner, context);
- store.init(context, store);
+ store.init(context, store, null);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
index 7ca0edbce..246d76cf3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java
@@ -76,7 +76,7 @@ public class CompositeReadOnlyKeyValueStoreTest {
store.init(new InternalMockProcessorContext(new StateSerdes<>(ProcessorStateManager.storeChangelogTopic("appId", storeName), Serdes.String(), Serdes.String()),
new NoOpRecordCollector()),
- store);
+ store, null);
return store;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
index 0848970cd..ed4ed3136 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java
@@ -37,7 +37,7 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest {
.withLoggingEnabled(Collections.singletonMap("retention.ms", "1000"));
final StateStore store = storeBuilder.build();
- store.init(context, store);
+ store.init(context, store, null);
return (KeyValueStore) store;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index ef5d6dcfb..66e81441b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -39,7 +39,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
(Serde) context.valueSerde());
final StateStore store = storeBuilder.build();
- store.init(context, store);
+ store.init(context, store, null);
return (KeyValueStore) store;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 1bd404599..054378b8e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -46,7 +46,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
(Serde) context.valueSerde());
final StateStore store = storeBuilder.build();
- store.init(context, store);
+ store.init(context, store, null);
return (KeyValueStore) store;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index e7f5ed068..a60f46729 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -110,7 +110,7 @@ public class InMemoryWindowStoreTest {
Serdes.Integer(),
Serdes.String()).build();
- store.init(context, store);
+ store.init(context, store, null);
return store;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index b8fc88e62..c6799c07b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -96,7 +96,7 @@ public class MeteredKeyValueStoreTest {
private void init() {
replay(inner, context);
- metered.init(context, metered);
+ metered.init(context, metered, null);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index b349f178a..b9035458c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -95,7 +95,7 @@ public class MeteredSessionStoreTest {
private void init() {
replay(inner, context);
- metered.init(context, metered);
+ metered.init(context, metered, null);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
index a3522f311..91055b294 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
@@ -75,7 +75,7 @@ public class MeteredTimestampWindowStoreTest {
EasyMock.expectLastCall();
EasyMock.replay(innerStoreMock);
- store.init(context, store);
+ store.init(context, store, null);
store.close();
EasyMock.verify(innerStoreMock);
}
@@ -85,7 +85,7 @@ public class MeteredTimestampWindowStoreTest {
EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null);
EasyMock.replay(innerStoreMock);
- store.init(context, store);
+ store.init(context, store, null);
assertNull(store.fetch("a", 0));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 0f60d2494..717805891 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -99,7 +99,7 @@ public class MeteredTimestampedKeyValueStoreTest {
private void init() {
replay(inner, context);
- metered.init(context, metered);
+ metered.init(context, metered, null);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 962888ae9..4bd5cfaaa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -59,6 +59,7 @@ public class MeteredWindowStoreTest {
private InternalMockProcessorContext context;
@SuppressWarnings("unchecked")
private final WindowStore innerStoreMock = createNiceMock(WindowStore.class);
+ private StreamsConfig config;
private final MeteredWindowStore store = new MeteredWindowStore<>(
innerStoreMock,
10L, // any size
@@ -77,12 +78,13 @@ public class MeteredWindowStoreTest {
public void setUp() {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+ config = new StreamsConfig(StreamsTestUtils.getStreamsConfig());
context = new InternalMockProcessorContext(
TestUtils.tempDirectory(),
Serdes.String(),
Serdes.Long(),
streamsMetrics,
- new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+ config,
NoOpRecordCollector::new,
new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
);
@@ -91,7 +93,7 @@ public class MeteredWindowStoreTest {
@Test
public void testMetrics() {
replay(innerStoreMock);
- store.init(context, store);
+ store.init(context, store, config);
final JmxReporter reporter = new JmxReporter("kafka.streams");
metrics.addReporter(reporter);
assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s",
@@ -102,10 +104,10 @@ public class MeteredWindowStoreTest {
@Test
public void shouldRecordRestoreLatencyOnInit() {
- innerStoreMock.init(context, store);
+ innerStoreMock.init(context, store, config);
expectLastCall();
replay(innerStoreMock);
- store.init(context, store);
+ store.init(context, store, config);
final Map metrics = context.metrics().metrics();
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
@@ -118,7 +120,7 @@ public class MeteredWindowStoreTest {
expectLastCall();
replay(innerStoreMock);
- store.init(context, store);
+ store.init(context, store, config);
store.put("a", "a");
final Map metrics = context.metrics().metrics();
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
@@ -131,7 +133,7 @@ public class MeteredWindowStoreTest {
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyWindowStoreIterator());
replay(innerStoreMock);
- store.init(context, store);
+ store.init(context, store, config);
store.fetch("a", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
final Map metrics = context.metrics().metrics();
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
@@ -144,7 +146,7 @@ public class MeteredWindowStoreTest {
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators., byte[]>emptyIterator());
replay(innerStoreMock);
- store.init(context, store);
+ store.init(context, store, config);
store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close;
final Map metrics = context.metrics().metrics();
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
@@ -158,7 +160,7 @@ public class MeteredWindowStoreTest {
expectLastCall();
replay(innerStoreMock);
- store.init(context, store);
+ store.init(context, store, config);
store.flush();
final Map metrics = context.metrics().metrics();
assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
@@ -172,7 +174,7 @@ public class MeteredWindowStoreTest {
expectLastCall();
replay(innerStoreMock);
- store.init(context, store);
+ store.init(context, store, config);
store.close();
verify(innerStoreMock);
}
@@ -182,7 +184,7 @@ public class MeteredWindowStoreTest {
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null);
replay(innerStoreMock);
- store.init(context, store);
+ store.init(context, store, config);
assertNull(store.fetch("a", 0));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
index aad740387..e228b2e34 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.Windowed;
@@ -251,7 +252,7 @@ public class ReadOnlyWindowStoreStub implements ReadOnlyWindowStore,
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {}
+ public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) {}
@Override
public void flush() {}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index bdf0379dd..3db0d6580 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -46,7 +46,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
(Serde) context.valueSerde());
final StateStore store = storeBuilder.build();
- store.init(context, store);
+ store.init(context, store, null);
return (KeyValueStore) store;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index d0dd13320..b2d9d0e32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -133,7 +133,7 @@ public class RocksDBSegmentedBytesStoreTest {
new NoOpRecordCollector(),
new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))
);
- bytesStore.init(context, bytesStore);
+ bytesStore.init(context, bytesStore, null);
}
@After
@@ -298,7 +298,7 @@ public class RocksDBSegmentedBytesStoreTest {
schema
);
- bytesStore.init(context, bytesStore);
+ bytesStore.init(context, bytesStore, null);
final List, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L));
assertThat(
results,
@@ -335,7 +335,7 @@ public class RocksDBSegmentedBytesStoreTest {
schema
);
- bytesStore.init(context, bytesStore);
+ bytesStore.init(context, bytesStore, null);
final List, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L));
assertThat(
results,
@@ -355,7 +355,7 @@ public class RocksDBSegmentedBytesStoreTest {
// need to create a segment so we can attempt to write to it again.
bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50));
bytesStore.close();
- bytesStore.init(context, bytesStore);
+ bytesStore.init(context, bytesStore, null);
bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100));
}
@@ -400,7 +400,7 @@ public class RocksDBSegmentedBytesStoreTest {
@Test
public void shouldRespectBulkLoadOptionsDuringInit() {
- bytesStore.init(context, bytesStore);
+ bytesStore.init(context, bytesStore, null);
final String key = "a";
bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 0786c3723..e3f4a14dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -70,7 +70,7 @@ public class RocksDBSessionStoreTest {
0,
new MockStreamsMetrics(new Metrics())));
- sessionStore.init(context, sessionStore);
+ sessionStore.init(context, sessionStore, null);
}
@After
@@ -191,7 +191,7 @@ public class RocksDBSessionStoreTest {
Serdes.String(),
Serdes.Long()).build();
- sessionStore.init(context, sessionStore);
+ sessionStore.init(context, sessionStore, null);
sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L);
sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index db458eb77..6d4ef9790 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -64,6 +64,7 @@ public class RocksDBStoreTest {
private File dir;
private final Serializer stringSerializer = new StringSerializer();
private final Deserializer stringDeserializer = new StringDeserializer();
+ StreamsConfig config;
InternalMockProcessorContext context;
RocksDBStore rocksDBStore;
@@ -74,10 +75,11 @@ public class RocksDBStoreTest {
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class);
rocksDBStore = getRocksDBStore();
dir = TestUtils.tempDirectory();
+ config = new StreamsConfig(props);
context = new InternalMockProcessorContext(dir,
Serdes.String(),
Serdes.String(),
- new StreamsConfig(props));
+ config);
}
RocksDBStore getRocksDBStore() {
@@ -91,7 +93,7 @@ public class RocksDBStoreTest {
@Test
public void shouldRespectBulkloadOptionsDuringInit() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
final StateRestoreListener restoreListener = context.getRestoreListener(rocksDBStore.name());
@@ -110,7 +112,7 @@ public class RocksDBStoreTest {
@Test
public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
final String message = "how can a 4 ounce bird carry a 2lb coconut";
int intKey = 1;
@@ -171,7 +173,7 @@ public class RocksDBStoreTest {
new Bytes(stringSerializer.serialize(null, "3")),
stringSerializer.serialize(null, "c")));
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
rocksDBStore.putAll(entries);
rocksDBStore.flush();
@@ -194,7 +196,7 @@ public class RocksDBStoreTest {
@Test
public void shouldTogglePrepareForBulkloadSetting() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
final RocksDBStore.RocksDBBatchingRestoreCallback restoreListener =
(RocksDBStore.RocksDBBatchingRestoreCallback) rocksDBStore.batchingStateRestoreCallback;
@@ -209,7 +211,7 @@ public class RocksDBStoreTest {
public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() {
final List> entries = getKeyValueEntries();
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
context.restore(rocksDBStore.name(), entries);
final RocksDBStore.RocksDBBatchingRestoreCallback restoreListener =
@@ -226,7 +228,7 @@ public class RocksDBStoreTest {
public void shouldRestoreAll() {
final List> entries = getKeyValueEntries();
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
context.restore(rocksDBStore.name(), entries);
assertEquals(
@@ -248,7 +250,7 @@ public class RocksDBStoreTest {
@Test
public void shouldPutOnlyIfAbsentValue() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
final Bytes keyBytes = new Bytes(stringSerializer.serialize(null, "one"));
final byte[] valueBytes = stringSerializer.serialize(null, "A");
final byte[] valueBytesUpdate = stringSerializer.serialize(null, "B");
@@ -265,7 +267,7 @@ public class RocksDBStoreTest {
final List> entries = getKeyValueEntries();
entries.add(new KeyValue<>("1".getBytes(UTF_8), null));
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
context.restore(rocksDBStore.name(), entries);
final KeyValueIterator iterator = rocksDBStore.all();
@@ -289,7 +291,7 @@ public class RocksDBStoreTest {
// this will restore key "1" as WriteBatch applies updates in order
entries.add(new KeyValue<>("1".getBytes(UTF_8), "restored".getBytes(UTF_8)));
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
context.restore(rocksDBStore.name(), entries);
final KeyValueIterator iterator = rocksDBStore.all();
@@ -322,7 +324,7 @@ public class RocksDBStoreTest {
public void shouldRestoreThenDeleteOnRestoreAll() {
final List> entries = getKeyValueEntries();
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
context.restore(rocksDBStore.name(), entries);
@@ -362,7 +364,7 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowNullPointerExceptionOnNullPut() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
try {
rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"));
fail("Should have thrown NullPointerException on null put()");
@@ -373,7 +375,7 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowNullPointerExceptionOnNullPutAll() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
try {
rocksDBStore.put(null, stringSerializer.serialize(null, "someVal"));
fail("Should have thrown NullPointerException on null put()");
@@ -384,7 +386,7 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowNullPointerExceptionOnNullGet() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
try {
rocksDBStore.get(null);
fail("Should have thrown NullPointerException on null get()");
@@ -395,7 +397,7 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowNullPointerExceptionOnDelete() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
try {
rocksDBStore.delete(null);
fail("Should have thrown NullPointerException on deleting null key");
@@ -406,7 +408,7 @@ public class RocksDBStoreTest {
@Test
public void shouldThrowNullPointerExceptionOnRange() {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
try {
rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2")));
fail("Should have thrown NullPointerException on deleting null key");
@@ -417,7 +419,7 @@ public class RocksDBStoreTest {
@Test(expected = ProcessorStateException.class)
public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException {
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
Utils.delete(dir);
rocksDBStore.put(
new Bytes(stringSerializer.serialize(null, "anyKey")),
@@ -438,7 +440,7 @@ public class RocksDBStoreTest {
new StreamsConfig(props));
enableBloomFilters = false;
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
final List expectedValues = new ArrayList<>();
expectedValues.add("a");
@@ -463,7 +465,7 @@ public class RocksDBStoreTest {
// reopen with Bloom Filters enabled
// should open fine without errors
enableBloomFilters = true;
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
for (final KeyValue keyValue : keyValues) {
final byte[] valBytes = rocksDBStore.get(new Bytes(keyValue.key));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
index f49527b0d..d65f7174f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
@@ -53,7 +53,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
LogCaptureAppender.setClassLoggerToDebug(RocksDBTimestampedStore.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode"));
LogCaptureAppender.unregister(appender);
@@ -251,7 +251,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
// check that still in upgrade mode
LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode"));
LogCaptureAppender.unregister(appender);
rocksDBStore.close();
@@ -270,14 +270,14 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
// check that still in regular mode
appender = LogCaptureAppender.createAndRegister();
- rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.init(context, rocksDBStore, config);
assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode"));
LogCaptureAppender.unregister(appender);
}
private void prepareOldStore() {
final RocksDBStore keyValueStore = new RocksDBStore(DB_NAME);
- keyValueStore.init(context, keyValueStore);
+ keyValueStore.init(context, keyValueStore, config);
keyValueStore.put(new Bytes("key1".getBytes()), "1".getBytes());
keyValueStore.put(new Bytes("key2".getBytes()), "22".getBytes());
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 42b1b8c37..04acad290 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -122,7 +122,7 @@ public class RocksDBWindowStoreTest {
Serdes.Integer(),
Serdes.String()).build();
- store.init(context, store);
+ store.init(context, store, null);
return store;
}
@@ -1311,7 +1311,7 @@ public class RocksDBWindowStoreTest {
Serdes.String(),
Serdes.String()).build();
- windowStore.init(context, windowStore);
+ windowStore.init(context, windowStore, null);
windowStore.put("a", "0001", 0);
windowStore.put("aa", "0002", 0);
@@ -1375,7 +1375,7 @@ public class RocksDBWindowStoreTest {
Serdes.Bytes(),
Serdes.String()).build();
- windowStore.init(context, windowStore);
+ windowStore.init(context, windowStore, null);
final Bytes key1 = Bytes.wrap(new byte[] {0});
final Bytes key2 = Bytes.wrap(new byte[] {0, 0});
diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
index a5e345efc..692d31602 100644
--- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
@@ -17,6 +17,7 @@
package org.apache.kafka.test;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -62,7 +63,7 @@ public class GenericInMemoryKeyValueStore
@SuppressWarnings("unchecked")
/* This is a "dummy" store used for testing;
it does not support restoring from changelog since we allow it to be serde-ignorant */
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) {
if (root != null) {
context.register(root, null);
}
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index b83936b8d..fd0d28021 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -111,7 +111,7 @@ public class KStreamTestDriver extends ExternalResource {
private void initTopology(final ProcessorTopology topology, final List stores) {
for (final StateStore store : stores) {
try {
- store.init(context, store);
+ store.init(context, store, null);
} catch (final RuntimeException e) {
new RuntimeException("Fatal exception initializing store.", e).printStackTrace();
throw e;
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
index 1729b2433..42df252d6 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.test;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -50,7 +51,8 @@ public class MockKeyValueStore implements KeyValueStore {
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
+ final StateStore root,
+ final StreamsConfig config) {
context.register(root, stateRestoreCallback);
initialized = true;
closed = false;
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index 08945d504..403c70513 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.test;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -73,7 +74,7 @@ public class NoOpReadOnlyStore
}
@Override
- public void init(final ProcessorContext context, final StateStore root) {
+ public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) {
if (rocksdbStore) {
// cf. RocksDBStore
new File(context.stateDir() + File.separator + "rocksdb" + File.separator + name).mkdirs();
diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
index 39ff61475..83687081a 100644
--- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
@@ -17,6 +17,7 @@
package org.apache.kafka.test;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -97,7 +98,7 @@ public class ReadOnlySessionStoreStub implements ReadOnlySessionStore extends ReadOnlyKeyValueStoreFacade
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
- inner.init(context, root);
+ final StateStore root,
+ final StreamsConfig config) {
+ inner.init(context, root, config);
}
@Override
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
index f6f8f3361..9ac646d77 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -32,8 +33,9 @@ public class WindowStoreFacade extends ReadOnlyWindowStoreFacade imp
@Override
public void init(final ProcessorContext context,
- final StateStore root) {
- inner.init(context, root);
+ final StateStore root,
+ final StreamsConfig config) {
+ inner.init(context, root, config);
}
@Override