diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index 0ddcbc5bd..4e97aea01 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -3548,6 +3548,7 @@ val clicksPerRegion: KTable[String, Long] = clicksPerRegion.toStream.to(outputTopic)

A complete example of user-defined SerDes can be found in a test class within the library.

+

The auto-configure of user-defined SerDes would be done at KafkaStreams construction time.

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 4f15deadb..8dfe6ec32 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -49,6 +49,8 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorTopology; +import org.apache.kafka.streams.processor.internals.SinkNode; +import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; @@ -634,6 +636,25 @@ public class KafkaStreams implements AutoCloseable { this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM); } + @SuppressWarnings("unchecked") + private void configureSerDe(final Set sinks, final Set sources) { + for (final SinkNode sn : sinks) { + if (sn.getKeySer() != null) { + sn.getKeySer().configure(config.originals(), true); + } + if (sn.getValueSer() != null) { + sn.getValueSer().configure(config.originals(), false); + } + } + for (final SourceNode sn : sources) { + if (sn.getKeyDeSer() != null) { + sn.getKeyDeSer().configure(config.originals(), true); + } + if (sn.getValueDeSer() != null) { + sn.getValueDeSer().configure(config.originals(), false); + } + } + } private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, @@ -670,6 +691,7 @@ public class KafkaStreams implements AutoCloseable { // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception final ProcessorTopology taskTopology = internalTopologyBuilder.build(); + configureSerDe(taskTopology.sinks(), taskTopology.sources()); streamsMetadataState = new StreamsMetadataState( internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); @@ -683,6 +705,7 @@ public class KafkaStreams implements AutoCloseable { log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes."); } final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1)); final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() || (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore()); @@ -695,7 +718,12 @@ public class KafkaStreams implements AutoCloseable { final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener(); GlobalStreamThread.State globalThreadState = null; + for (StateStore ss : internalTopologyBuilder.globalStateStores().values()) { + int i = 0; + } if (globalTaskTopology != null) { + configureSerDe(globalTaskTopology.sinks(), globalTaskTopology.sources()); + final String globalThreadId = clientId + "-GlobalStreamThread"; globalStreamThread = new GlobalStreamThread(globalTaskTopology, config, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index df53ee2f2..3e3d9a5b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; /** @@ -61,7 +62,7 @@ public interface StateStore { * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - void init(ProcessorContext context, StateStore root); + void init(ProcessorContext context, StateStore root, StreamsConfig config); /** * Flush any cached data diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java index c30646884..363c3c123 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; @@ -42,6 +43,7 @@ abstract class AbstractStateManager implements StateManager { final File baseDir; final boolean eosEnabled; + StreamsConfig config; OffsetCheckpoint checkpoint; final Map checkpointableOffsets = new HashMap<>(); @@ -110,7 +112,7 @@ abstract class AbstractStateManager implements StateManager { throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException); } - stateStore.init(processorContext, stateStore); + stateStore.init(processorContext, stateStore, config); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 33878017b..165720580 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -52,6 +52,7 @@ public abstract class AbstractTask implements Task { final Logger log; final LogContext logContext; final StateDirectory stateDirectory; + final StreamsConfig config; boolean taskInitialized; boolean taskClosed; @@ -70,6 +71,7 @@ public abstract class AbstractTask implements Task { final boolean isStandby, final StateDirectory stateDirectory, final StreamsConfig config) { + this.config = config; this.id = id; this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); this.partitions = new HashSet<>(partitions); @@ -231,7 +233,7 @@ public abstract class AbstractTask implements Task { for (final StateStore store : topology.stateStores()) { log.trace("Initializing store {}", store.name()); processorContext.uninitialize(); - store.init(processorContext, store); + store.init(processorContext, store, config); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 768281428..d20da8eb0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -80,6 +80,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob } } + this.config = config; this.log = logContext.logger(GlobalStateManagerImpl.class); this.topology = topology; this.globalConsumer = globalConsumer; @@ -119,7 +120,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob final List stateStores = topology.globalStateStores(); for (final StateStore stateStore : stateStores) { globalStoreNames.add(stateStore.name()); - stateStore.init(processorContext, stateStore); + stateStore.init(processorContext, stateStore, config); } return Collections.unmodifiableSet(globalStoreNames); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 5f32a3b7a..8cae6682e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -237,7 +237,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @Override public void init(final ProcessorContext context, - final StateStore root) { + final StateStore root, + final StreamsConfig config) { throw new UnsupportedOperationException(ERROR_MESSAGE); } @@ -436,7 +437,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re @Override public void init(final ProcessorContext context, - final StateStore root) { + final StateStore root, + final StreamsConfig config) { throw new UnsupportedOperationException(ERROR_MESSAGE); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 73bffc80e..703f11618 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -44,6 +44,14 @@ public class SinkNode extends ProcessorNode { this.partitioner = partitioner; } + public Serializer getKeySer() { + return keySerializer; + } + + public Serializer getValueSer() { + return valSerializer; + } + /** * @throws UnsupportedOperationException if this method adds a child to a sink node */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java index 87505cab2..c17921411 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java @@ -52,6 +52,14 @@ public class SourceNode extends ProcessorNode { this(name, topics, null, keyDeserializer, valDeserializer); } + public Deserializer getKeyDeSer() { + return keyDeserializer; + } + + public Deserializer getValueDeSer() { + return valDeserializer; + } + K deserializeKey(final String topic, final Headers headers, final byte[] data) { return keyDeserializer.deserialize(topic, headers, data); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index bb347de9d..a08fc3ebb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -49,9 +50,10 @@ class CachingKeyValueStore @Override public void init(final ProcessorContext context, - final StateStore root) { + final StateStore root, + final StreamsConfig config) { initInternal(context); - super.init(context, root); + super.init(context, root, config); // save the stream thread as we only ever want to trigger a flush // when the stream thread is the current thread. streamThread = Thread.currentThread(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index edea7e0d4..149b7587e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -47,9 +48,9 @@ class CachingSessionStore } @Override - public void init(final ProcessorContext context, final StateStore root) { + public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) { initInternal((InternalProcessorContext) context); - super.init(context, root); + super.init(context, root, config); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index 0edd8f265..e5f730026 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -54,9 +55,10 @@ class CachingWindowStore } @Override - public void init(final ProcessorContext context, final StateStore root) { + public void init(final ProcessorContext context, final StateStore root, + final StreamsConfig config) { initInternal((InternalProcessorContext) context); - super.init(context, root); + super.init(context, root, config); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java index a924af600..59018946a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; @@ -40,8 +41,9 @@ public class ChangeLoggingKeyValueBytesStore @Override public void init(final ProcessorContext context, - final StateStore root) { - super.init(context, root); + final StateStore root, + final StreamsConfig config) { + super.init(context, root, config); final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()); changeLogger = new StoreChangeLogger<>( name(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java index 361f8a53b..bcab8a4d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.ProcessorContext; @@ -41,8 +42,8 @@ class ChangeLoggingSessionBytesStore } @Override - public void init(final ProcessorContext context, final StateStore root) { - super.init(context, root); + public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) { + super.init(context, root, config); final String topic = ProcessorStateManager.storeChangelogTopic( context.applicationId(), name()); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java index c58e9f09e..df072f817 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -49,9 +50,10 @@ class ChangeLoggingWindowBytesStore @Override public void init(final ProcessorContext context, - final StateStore root) { + final StateStore root, + final StreamsConfig config) { this.context = context; - super.init(context, root); + super.init(context, root, config); final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()); changeLogger = new StoreChangeLogger<>( name(), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index b37c39e3b..918925f10 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; @@ -47,7 +48,8 @@ public class InMemoryKeyValueStore implements KeyValueStore { @Override public void init(final ProcessorContext context, - final StateStore root) { + final StateStore root, + final StreamsConfig config) { if (root != null) { // register the store diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 65f53880b..a24d44630 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.BytesSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -182,7 +183,7 @@ public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuf } @Override - public void init(final ProcessorContext context, final StateStore root) { + public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) { final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context; bufferSizeSensor = Sensors.createBufferSizeSensor(this, internalProcessorContext); bufferCountSensor = Sensors.createBufferCountSensor(this, internalProcessorContext); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java index 7d1b279fb..fd4d264d8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.ProcessorContext; @@ -90,7 +91,7 @@ public class InMemoryWindowStore implements WindowStore { @Override @SuppressWarnings("unchecked") - public void init(final ProcessorContext context, final StateStore root) { + public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) { this.context = (InternalProcessorContext) context; final StreamsMetricsImpl metrics = this.context.metrics(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index d69df13f4..0f4882561 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; @@ -74,7 +75,7 @@ public class MemoryLRUCache implements KeyValueStore { } @Override - public void init(final ProcessorContext context, final StateStore root) { + public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) { // register the store context.register(root, (key, value) -> { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 51da3ed1b..23918e1ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -80,7 +81,8 @@ public class MeteredKeyValueStore @Override public void init(final ProcessorContext context, - final StateStore root) { + final StateStore root, + final StreamsConfig config) { metrics = (StreamsMetricsImpl) context.metrics(); taskName = context.taskId().toString(); @@ -104,12 +106,12 @@ public class MeteredKeyValueStore if (restoreTime.shouldRecord()) { measureLatency( () -> { - super.init(context, root); + super.init(context, root, config); return null; }, restoreTime); } else { - super.init(context, root); + super.init(context, root, config); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 4631601b1..0fde94189 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; @@ -67,8 +68,15 @@ public class MeteredSessionStore @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context, - final StateStore root) { + final StateStore root, + final StreamsConfig config) { //noinspection unchecked + if (keySerde != null) { + keySerde.configure(config.originals(), true); + } + if (valueSerde != null) { + valueSerde.configure(config.originals(), false); + } serdes = new StateSerdes<>( ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), keySerde == null ? (Serde) context.keySerde() : keySerde, @@ -89,7 +97,7 @@ public class MeteredSessionStore // register and possibly restore the state from the logs final long startNs = time.nanoseconds(); try { - super.init(context, root); + super.init(context, root, config); } finally { metrics.recordLatency( restoreTime, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java index 1c104915c..b92fbdc68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.state.StateSerdes; @@ -49,7 +50,7 @@ class MeteredTimestampedWindowStore @SuppressWarnings("unchecked") @Override - void initStoreSerde(final ProcessorContext context) { + void initStoreSerde(final ProcessorContext context, final StreamsConfig config) { serdes = new StateSerdes<>( ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), keySerde == null ? (Serde) context.keySerde() : keySerde, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 6d2eaab3d..184b97ec4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; @@ -69,9 +70,10 @@ public class MeteredWindowStore @Override public void init(final ProcessorContext context, - final StateStore root) { + final StateStore root, + final StreamsConfig config) { this.context = context; - initStoreSerde(context); + initStoreSerde(context, config); metrics = (StreamsMetricsImpl) context.metrics(); taskName = context.taskId().toString(); @@ -87,7 +89,7 @@ public class MeteredWindowStore // register and possibly restore the state from the logs final long startNs = time.nanoseconds(); try { - super.init(context, root); + super.init(context, root, config); } finally { metrics.recordLatency( restoreTime, @@ -98,7 +100,13 @@ public class MeteredWindowStore } @SuppressWarnings("unchecked") - void initStoreSerde(final ProcessorContext context) { + void initStoreSerde(final ProcessorContext context, final StreamsConfig config) { + if (keySerde != null) { + keySerde.configure(config.originals(), true); + } + if (valueSerde != null) { + valueSerde.configure(config.originals(), false); + } serdes = new StateSerdes<>( ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), keySerde == null ? (Serde) context.keySerde() : keySerde, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index f733c80e6..a5192c671 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback; import org.apache.kafka.streams.processor.ProcessorContext; @@ -148,7 +149,7 @@ public class RocksDBSegmentedBytesStore implements SegmentedBytesStore { } @Override - public void init(final ProcessorContext context, final StateStore root) { + public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) { this.context = (InternalProcessorContext) context; final StreamsMetricsImpl metrics = this.context.metrics(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 2ca3ad390..3c5ecd873 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -187,7 +187,8 @@ public class RocksDBStore implements KeyValueStore { } public void init(final ProcessorContext context, - final StateStore root) { + final StateStore root, + final StreamsConfig config) { // open the DB dir internalProcessorContext = context; openDB(context); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 3b634ebea..614effceb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -43,9 +44,10 @@ public class RocksDBWindowStore } @Override - public void init(final ProcessorContext context, final StateStore root) { + public void init(final ProcessorContext context, final StateStore root, + final StreamsConfig config) { this.context = context; - super.init(context, root); + super.init(context, root, config); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java index 62dc8e059..9801a4138 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.TimestampedBytesStore; /** @@ -44,8 +45,9 @@ public abstract class WrappedStateStore implements S @Override public void init(final ProcessorContext context, - final StateStore root) { - wrapped.init(context, root); + final StateStore root, + final StreamsConfig config) { + wrapped.init(context, root, config); } @SuppressWarnings("unchecked") diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 6b8b5b529..ab50740ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -476,7 +476,63 @@ public class KafkaStreamsTest { globalStreams.metadataForKey("store", "key", (topic, key, value, numPartitions) -> 0); } + static class MyStringSerializer extends StringSerializer { + boolean configured = false; + boolean called = false; + + @Override + public void configure(final Map configs, final boolean isKey) { + super.configure(configs, isKey); + configured = true; + } + + @Override + public byte[] serialize(final String topic, final String data) { + called = true; + return super.serialize(topic, data); + } + + boolean configured() { + return !called || configured; + } + } + static class MyStringDeserializer extends StringDeserializer { + boolean configured = false; + boolean called = false; + + @Override + public void configure(final Map configs, final boolean isKey) { + super.configure(configs, isKey); + configured = true; + } + @Override + public String deserialize(final String topic, final byte[] data) { + called = true; + return super.deserialize(topic, data); + } + + boolean configured() { + return !called || configured; + } + } + + public static class MyStringSerde extends Serdes.WrapperSerde { + public MyStringSerde() { + super(new MyStringSerializer(), new MyStringDeserializer()); + } + + @Override + public void configure(final Map configs, final boolean isKey) { + super.configure(configs, isKey); + } + public boolean configured() { + if (!((MyStringSerializer) this.serializer()).configured()) return false; + if (!((MyStringDeserializer) this.deserializer()).configured()) return false; + return true; + } + } @Test + @SuppressWarnings("unchecked") public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception { final AtomicBoolean keepRunning = new AtomicBoolean(true); KafkaStreams streams = null; @@ -486,7 +542,9 @@ public class KafkaStreamsTest { final String topic = "input"; CLUSTER.createTopics(topic); - builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String())) + final MyStringSerde keyTestSerde = new MyStringSerde(); + final MyStringSerde valueTestSerde = new MyStringSerde(); + builder.stream(topic, Consumed.with(keyTestSerde, valueTestSerde)) .foreach((key, value) -> { try { latch.countDown(); @@ -510,6 +568,8 @@ public class KafkaStreamsTest { assertTrue("Timed out waiting to receive single message", latch.await(30, TimeUnit.SECONDS)); assertFalse(streams.close(Duration.ofMillis(10))); + assertTrue(keyTestSerde.configured()); + assertTrue(valueTestSerde.configured()); } finally { // stop the thread so we don't interfere with other tests etc keepRunning.set(false); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index 7493b0691..0a2317d7d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -19,12 +19,14 @@ package org.apache.kafka.streams.integration; import kafka.utils.MockTime; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreamsTest.MyStringSerde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -39,6 +41,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestUtils; @@ -51,9 +54,11 @@ import org.junit.experimental.categories.Category; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Properties; import static java.time.Duration.ofMillis; +import static org.junit.Assert.assertTrue; /** * Similar to KStreamAggregationIntegrationTest but with dedupping enabled @@ -111,17 +116,22 @@ public class KStreamAggregationDedupIntegrationTest { @Test + @SuppressWarnings("unchecked") public void shouldReduce() throws Exception { produceMessages(System.currentTimeMillis()); + final MyStringSerde keyTestSerde = new MyStringSerde(); + final MyStringSerde valueTestSerde = new MyStringSerde(); groupedStream .reduce(reducer, Materialized.as("reduce-by-key")) .toStream() - .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); + .to(outputTopic, Produced.with(keyTestSerde, valueTestSerde)); startStreams(); produceMessages(System.currentTimeMillis()); + assertTrue(keyTestSerde.configured()); + assertTrue(valueTestSerde.configured()); validateReceivedMessages( new StringDeserializer(), new StringDeserializer(), @@ -170,16 +180,79 @@ public class KStreamAggregationDedupIntegrationTest { ); } + static class MyIntSerializer extends IntegerSerializer { + boolean configured = false; + boolean called = false; + + MyIntSerializer() { + super(); + } + + @Override + public void configure(final Map configs, final boolean isKey) { + super.configure(configs, isKey); + configured = true; + } + + @Override + public byte[] serialize(final String topic, final Integer data) { + called = true; + return super.serialize(topic, data); + } + + boolean configured() { + return !called || configured; + } + } + static class MyIntDeserializer extends IntegerDeserializer { + boolean configured = false; + boolean called = false; + + @Override + public void configure(final Map configs, final boolean isKey) { + super.configure(configs, isKey); + configured = true; + } + @Override + public Integer deserialize(final String topic, final byte[] data) { + called = true; + return super.deserialize(topic, data); + } + + boolean configured() { + return !called || configured; + } + } + + class MyIntegerSerde extends Serdes.WrapperSerde { + public MyIntegerSerde() { + super(new MyIntSerializer(), new MyIntDeserializer()); + } + @Override + public void configure(final Map configs, final boolean isKey) { + super.configure(configs, isKey); + } + + public boolean configured() { + if (!((MyIntSerializer) this.serializer()).configured()) return false; + if (!((MyIntDeserializer) this.deserializer()).configured()) return false; + return true; + } + } @Test + @SuppressWarnings("unchecked") public void shouldGroupByKey() throws Exception { final long timestamp = mockTime.milliseconds(); produceMessages(timestamp); produceMessages(timestamp); - stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String())) + final MyIntegerSerde keyTestSerde = new MyIntegerSerde(); + final MyStringSerde valueTestSerde = new MyStringSerde(); + stream.groupByKey(Grouped.with(keyTestSerde, valueTestSerde)) .windowedBy(TimeWindows.of(ofMillis(500L))) .count(Materialized.as("count-windows")) - .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()) + .toStream((windowedKey, value) -> ((Windowed)windowedKey).key() + "@" + + ((Windowed)windowedKey).window().start()) .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); startStreams(); @@ -197,6 +270,8 @@ public class KStreamAggregationDedupIntegrationTest { KeyValue.pair("5@" + window, 2L) ) ); + assertTrue(keyTestSerde.configured()); + assertTrue(valueTestSerde.configured()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java index 4038bfd68..cf3fc9eb2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java @@ -83,18 +83,19 @@ public class KStreamSessionWindowAggregateProcessorTest { private SessionStore sessionStore; private InternalMockProcessorContext context; private Metrics metrics; + private StreamsConfig config; @Before public void initializeStore() { final File stateDir = TestUtils.tempDirectory(); metrics = new Metrics(); + config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()); final MockStreamsMetrics metrics = new MockStreamsMetrics(KStreamSessionWindowAggregateProcessorTest.this.metrics); context = new InternalMockProcessorContext( stateDir, Serdes.String(), Serdes.String(), - metrics, - new StreamsConfig(StreamsTestUtils.getStreamsConfig()), + metrics, config, NoOpRecordCollector::new, new ThreadCache(new LogContext("testCache "), 100000, metrics) ) { @@ -104,11 +105,11 @@ public class KStreamSessionWindowAggregateProcessorTest { } }; - initStore(true); + initStore(true, config); processor.init(context); } - private void initStore(final boolean enableCaching) { + private void initStore(final boolean enableCaching, StreamsConfig config) { final StoreBuilder> storeBuilder = Stores.sessionStoreBuilder( Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3)), @@ -121,7 +122,7 @@ public class KStreamSessionWindowAggregateProcessorTest { } sessionStore = storeBuilder.build(); - sessionStore.init(context, sessionStore); + sessionStore.init(context, sessionStore, config); } @After @@ -273,7 +274,7 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() { - initStore(false); + initStore(false, config); processor.init(context); context.setTime(0); @@ -293,7 +294,7 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldImmediatelyForwardRemovedSessionsWhenMerging() { - initStore(false); + initStore(false, config); processor.init(context); context.setTime(0); @@ -313,7 +314,7 @@ public class KStreamSessionWindowAggregateProcessorTest { @Test public void shouldLogAndMeterWhenSkippingNullKey() { - initStore(false); + initStore(false, config); processor.init(context); context.setRecordContext(new ProcessorRecordContext(-1, -2, -3, "topic", null)); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); @@ -340,7 +341,7 @@ public class KStreamSessionWindowAggregateProcessorTest { sessionMerger ).get(); - initStore(false); + initStore(false, config); processor.init(context); // dummy record to advance stream time diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java index 228cfc8d9..598cae4ee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java @@ -151,7 +151,7 @@ public class KTableSuppressProcessorMetricsTest { final MockInternalProcessorContext context = new MockInternalProcessorContext(); context.setCurrentNode(new ProcessorNode("testNode")); - buffer.init(context, buffer); + buffer.init(context, buffer, null); processor.init(context); final long timestamp = 100L; diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java index 354fdd5e9..eebfb7273 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -87,7 +87,7 @@ public class KTableSuppressProcessorTest { final MockInternalProcessorContext context = new MockInternalProcessorContext(); context.setCurrentNode(new ProcessorNode("testNode")); - buffer.init(context, buffer); + buffer.init(context, buffer, null); processor.init(context); this.processor = processor; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java index b36557cda..a7e225dfc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java @@ -152,7 +152,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForKeyValueStore() { final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME); try { - store.init(null, null); + store.init(null, null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -161,7 +161,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForTimestampedKeyValueStore() { final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME); try { - store.init(null, null); + store.init(null, null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -170,7 +170,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForWindowStore() { final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME); try { - store.init(null, null); + store.init(null, null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -179,7 +179,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForTimestampedWindowStore() { final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME); try { - store.init(null, null); + store.init(null, null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -188,7 +188,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForSessionStore() { final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME); try { - store.init(null, null); + store.init(null, null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index fe4d94880..5f9b076b6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -547,7 +547,7 @@ public class ProcessorContextImplTest { assertTrue(store.persistent()); assertTrue(store.isOpen()); - checkThrowsUnsupportedOperation(() -> store.init(null, null), "init()"); + checkThrowsUnsupportedOperation(() -> store.init(null, null, null), "init()"); checkThrowsUnsupportedOperation(store::close, "close()"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java index 34d9050c7..55dbdf986 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/NoOpWindowStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state; import java.time.Instant; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -59,7 +60,7 @@ public class NoOpWindowStore implements ReadOnlyWindowStore, StateStore { } @Override - public void init(final ProcessorContext context, final StateStore root) { + public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) { } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java index 71e15d46d..0e8fa03e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java @@ -75,7 +75,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { context = new InternalMockProcessorContext(null, null, null, null, cache); topic = "topic"; context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic, null)); - store.init(context, null); + store.init(context, null, null); } @After @@ -93,7 +93,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { .withCachingEnabled(); final KeyValueStore store = (KeyValueStore) storeBuilder.build(); - store.init(context, store); + store.init(context, store, null); return store; } @@ -126,7 +126,7 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest { cache = EasyMock.niceMock(ThreadCache.class); context = new InternalMockProcessorContext(null, null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic, null)); - store.init(context, null); + store.init(context, null, null); cache.flush("0_0-store"); EasyMock.expectLastCall().andThrow(new NullPointerException("Simulating an error on flush")); EasyMock.replay(cache); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java index 66b27f0a2..8b9515cb3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java @@ -81,7 +81,7 @@ public class CachingSessionStoreTest { cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics())); final InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, "topic", null)); - cachingStore.init(context, cachingStore); + cachingStore.init(context, cachingStore, null); } @After diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java index da49dded4..b8bc016c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java @@ -99,7 +99,7 @@ public class CachingWindowStoreTest { topic = "topic"; context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache); context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0, 0, topic, null)); - cachingStore.init(context, cachingStore); + cachingStore.init(context, cachingStore, null); } @After diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java index 11f7c4c76..da2dbf14e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java @@ -72,7 +72,7 @@ public class ChangeLoggingKeyValueBytesStoreTest { collector, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); context.setTime(0); - store.init(context, store); + store.init(context, store, null); } @After diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java index 94eee76f6..cc0b18962 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java @@ -77,11 +77,11 @@ public class ChangeLoggingSessionBytesStoreTest { private void init() { EasyMock.expect(context.taskId()).andReturn(taskId); EasyMock.expect(context.recordCollector()).andReturn(collector); - inner.init(context, store); + inner.init(context, store, null); EasyMock.expectLastCall(); EasyMock.replay(inner, context); - store.init(context, store); + store.init(context, store, null); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java index 2c19c48aa..97530bdc7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreTest.java @@ -77,7 +77,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStoreTest { collector, new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics()))); context.setTime(0); - store.init(context, store); + store.init(context, store, null); } @After diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java index edf210eae..a1f09d6b3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java @@ -78,11 +78,11 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest { private void init() { EasyMock.expect(context.taskId()).andReturn(taskId); EasyMock.expect(context.recordCollector()).andReturn(collector); - inner.init(context, store); + inner.init(context, store, null); EasyMock.expectLastCall(); EasyMock.replay(inner, context); - store.init(context, store); + store.init(context, store, null); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java index d7ad6d22d..99a5b67b5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java @@ -75,11 +75,11 @@ public class ChangeLoggingWindowBytesStoreTest { private void init() { EasyMock.expect(context.taskId()).andReturn(taskId); EasyMock.expect(context.recordCollector()).andReturn(collector); - inner.init(context, store); + inner.init(context, store, null); EasyMock.expectLastCall(); EasyMock.replay(inner, context); - store.init(context, store); + store.init(context, store, null); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java index 7ca0edbce..246d76cf3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java @@ -76,7 +76,7 @@ public class CompositeReadOnlyKeyValueStoreTest { store.init(new InternalMockProcessorContext(new StateSerdes<>(ProcessorStateManager.storeChangelogTopic("appId", storeName), Serdes.String(), Serdes.String()), new NoOpRecordCollector()), - store); + store, null); return store; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java index 0848970cd..ed4ed3136 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStoreTest.java @@ -37,7 +37,7 @@ public class InMemoryKeyValueLoggedStoreTest extends AbstractKeyValueStoreTest { .withLoggingEnabled(Collections.singletonMap("retention.ms", "1000")); final StateStore store = storeBuilder.build(); - store.init(context, store); + store.init(context, store, null); return (KeyValueStore) store; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java index ef5d6dcfb..66e81441b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java @@ -39,7 +39,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { (Serde) context.valueSerde()); final StateStore store = storeBuilder.build(); - store.init(context, store); + store.init(context, store, null); return (KeyValueStore) store; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java index 1bd404599..054378b8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java @@ -46,7 +46,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest { (Serde) context.valueSerde()); final StateStore store = storeBuilder.build(); - store.init(context, store); + store.init(context, store, null); return (KeyValueStore) store; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java index e7f5ed068..a60f46729 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java @@ -110,7 +110,7 @@ public class InMemoryWindowStoreTest { Serdes.Integer(), Serdes.String()).build(); - store.init(context, store); + store.init(context, store, null); return store; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index b8fc88e62..c6799c07b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -96,7 +96,7 @@ public class MeteredKeyValueStoreTest { private void init() { replay(inner, context); - metered.init(context, metered); + metered.init(context, metered, null); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index b349f178a..b9035458c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -95,7 +95,7 @@ public class MeteredSessionStoreTest { private void init() { replay(inner, context); - metered.init(context, metered); + metered.init(context, metered, null); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java index a3522f311..91055b294 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java @@ -75,7 +75,7 @@ public class MeteredTimestampWindowStoreTest { EasyMock.expectLastCall(); EasyMock.replay(innerStoreMock); - store.init(context, store); + store.init(context, store, null); store.close(); EasyMock.verify(innerStoreMock); } @@ -85,7 +85,7 @@ public class MeteredTimestampWindowStoreTest { EasyMock.expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null); EasyMock.replay(innerStoreMock); - store.init(context, store); + store.init(context, store, null); assertNull(store.fetch("a", 0)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 0f60d2494..717805891 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -99,7 +99,7 @@ public class MeteredTimestampedKeyValueStoreTest { private void init() { replay(inner, context); - metered.init(context, metered); + metered.init(context, metered, null); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index 962888ae9..4bd5cfaaa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -59,6 +59,7 @@ public class MeteredWindowStoreTest { private InternalMockProcessorContext context; @SuppressWarnings("unchecked") private final WindowStore innerStoreMock = createNiceMock(WindowStore.class); + private StreamsConfig config; private final MeteredWindowStore store = new MeteredWindowStore<>( innerStoreMock, 10L, // any size @@ -77,12 +78,13 @@ public class MeteredWindowStoreTest { public void setUp() { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test"); + config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()); context = new InternalMockProcessorContext( TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), streamsMetrics, - new StreamsConfig(StreamsTestUtils.getStreamsConfig()), + config, NoOpRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0, streamsMetrics) ); @@ -91,7 +93,7 @@ public class MeteredWindowStoreTest { @Test public void testMetrics() { replay(innerStoreMock); - store.init(context, store); + store.init(context, store, config); final JmxReporter reporter = new JmxReporter("kafka.streams"); metrics.addReporter(reporter); assertTrue(reporter.containsMbean(String.format("kafka.streams:type=stream-%s-metrics,client-id=%s,task-id=%s,%s-id=%s", @@ -102,10 +104,10 @@ public class MeteredWindowStoreTest { @Test public void shouldRecordRestoreLatencyOnInit() { - innerStoreMock.init(context, store); + innerStoreMock.init(context, store, config); expectLastCall(); replay(innerStoreMock); - store.init(context, store); + store.init(context, store, config); final Map metrics = context.metrics().metrics(); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue()); @@ -118,7 +120,7 @@ public class MeteredWindowStoreTest { expectLastCall(); replay(innerStoreMock); - store.init(context, store); + store.init(context, store, config); store.put("a", "a"); final Map metrics = context.metrics().metrics(); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); @@ -131,7 +133,7 @@ public class MeteredWindowStoreTest { expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyWindowStoreIterator()); replay(innerStoreMock); - store.init(context, store); + store.init(context, store, config); store.fetch("a", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; final Map metrics = context.metrics().metrics(); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); @@ -144,7 +146,7 @@ public class MeteredWindowStoreTest { expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators., byte[]>emptyIterator()); replay(innerStoreMock); - store.init(context, store); + store.init(context, store, config); store.fetch("a", "b", ofEpochMilli(1), ofEpochMilli(1)).close(); // recorded on close; final Map metrics = context.metrics().metrics(); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); @@ -158,7 +160,7 @@ public class MeteredWindowStoreTest { expectLastCall(); replay(innerStoreMock); - store.init(context, store); + store.init(context, store, config); store.flush(); final Map metrics = context.metrics().metrics(); assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue()); @@ -172,7 +174,7 @@ public class MeteredWindowStoreTest { expectLastCall(); replay(innerStoreMock); - store.init(context, store); + store.init(context, store, config); store.close(); verify(innerStoreMock); } @@ -182,7 +184,7 @@ public class MeteredWindowStoreTest { expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0)).andReturn(null); replay(innerStoreMock); - store.init(context, store); + store.init(context, store, config); assertNull(store.fetch("a", 0)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java index aad740387..e228b2e34 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreStub.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.internals.ApiUtils; import org.apache.kafka.streams.kstream.Windowed; @@ -251,7 +252,7 @@ public class ReadOnlyWindowStoreStub implements ReadOnlyWindowStore, } @Override - public void init(final ProcessorContext context, final StateStore root) {} + public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) {} @Override public void flush() {} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java index bdf0379dd..3db0d6580 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java @@ -46,7 +46,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest { (Serde) context.valueSerde()); final StateStore store = storeBuilder.build(); - store.init(context, store); + store.init(context, store, null); return (KeyValueStore) store; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index d0dd13320..b2d9d0e32 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -133,7 +133,7 @@ public class RocksDBSegmentedBytesStoreTest { new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())) ); - bytesStore.init(context, bytesStore); + bytesStore.init(context, bytesStore, null); } @After @@ -298,7 +298,7 @@ public class RocksDBSegmentedBytesStoreTest { schema ); - bytesStore.init(context, bytesStore); + bytesStore.init(context, bytesStore, null); final List, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L)); assertThat( results, @@ -335,7 +335,7 @@ public class RocksDBSegmentedBytesStoreTest { schema ); - bytesStore.init(context, bytesStore); + bytesStore.init(context, bytesStore, null); final List, Long>> results = toList(bytesStore.fetch(Bytes.wrap(key.getBytes()), 0L, 60_000L)); assertThat( results, @@ -355,7 +355,7 @@ public class RocksDBSegmentedBytesStoreTest { // need to create a segment so we can attempt to write to it again. bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50)); bytesStore.close(); - bytesStore.init(context, bytesStore); + bytesStore.init(context, bytesStore, null); bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100)); } @@ -400,7 +400,7 @@ public class RocksDBSegmentedBytesStoreTest { @Test public void shouldRespectBulkLoadOptionsDuringInit() { - bytesStore.init(context, bytesStore); + bytesStore.init(context, bytesStore, null); final String key = "a"; bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L)); bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java index 0786c3723..e3f4a14dc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java @@ -70,7 +70,7 @@ public class RocksDBSessionStoreTest { 0, new MockStreamsMetrics(new Metrics()))); - sessionStore.init(context, sessionStore); + sessionStore.init(context, sessionStore, null); } @After @@ -191,7 +191,7 @@ public class RocksDBSessionStoreTest { Serdes.String(), Serdes.Long()).build(); - sessionStore.init(context, sessionStore); + sessionStore.init(context, sessionStore, null); sessionStore.put(new Windowed<>("a", new SessionWindow(0, 0)), 1L); sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 10)), 2L); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index db458eb77..6d4ef9790 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -64,6 +64,7 @@ public class RocksDBStoreTest { private File dir; private final Serializer stringSerializer = new StringSerializer(); private final Deserializer stringDeserializer = new StringDeserializer(); + StreamsConfig config; InternalMockProcessorContext context; RocksDBStore rocksDBStore; @@ -74,10 +75,11 @@ public class RocksDBStoreTest { props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, MockRocksDbConfigSetter.class); rocksDBStore = getRocksDBStore(); dir = TestUtils.tempDirectory(); + config = new StreamsConfig(props); context = new InternalMockProcessorContext(dir, Serdes.String(), Serdes.String(), - new StreamsConfig(props)); + config); } RocksDBStore getRocksDBStore() { @@ -91,7 +93,7 @@ public class RocksDBStoreTest { @Test public void shouldRespectBulkloadOptionsDuringInit() { - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); final StateRestoreListener restoreListener = context.getRestoreListener(rocksDBStore.name()); @@ -110,7 +112,7 @@ public class RocksDBStoreTest { @Test public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() { - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); final String message = "how can a 4 ounce bird carry a 2lb coconut"; int intKey = 1; @@ -171,7 +173,7 @@ public class RocksDBStoreTest { new Bytes(stringSerializer.serialize(null, "3")), stringSerializer.serialize(null, "c"))); - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); rocksDBStore.putAll(entries); rocksDBStore.flush(); @@ -194,7 +196,7 @@ public class RocksDBStoreTest { @Test public void shouldTogglePrepareForBulkloadSetting() { - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); final RocksDBStore.RocksDBBatchingRestoreCallback restoreListener = (RocksDBStore.RocksDBBatchingRestoreCallback) rocksDBStore.batchingStateRestoreCallback; @@ -209,7 +211,7 @@ public class RocksDBStoreTest { public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() { final List> entries = getKeyValueEntries(); - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); context.restore(rocksDBStore.name(), entries); final RocksDBStore.RocksDBBatchingRestoreCallback restoreListener = @@ -226,7 +228,7 @@ public class RocksDBStoreTest { public void shouldRestoreAll() { final List> entries = getKeyValueEntries(); - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); context.restore(rocksDBStore.name(), entries); assertEquals( @@ -248,7 +250,7 @@ public class RocksDBStoreTest { @Test public void shouldPutOnlyIfAbsentValue() { - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); final Bytes keyBytes = new Bytes(stringSerializer.serialize(null, "one")); final byte[] valueBytes = stringSerializer.serialize(null, "A"); final byte[] valueBytesUpdate = stringSerializer.serialize(null, "B"); @@ -265,7 +267,7 @@ public class RocksDBStoreTest { final List> entries = getKeyValueEntries(); entries.add(new KeyValue<>("1".getBytes(UTF_8), null)); - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); context.restore(rocksDBStore.name(), entries); final KeyValueIterator iterator = rocksDBStore.all(); @@ -289,7 +291,7 @@ public class RocksDBStoreTest { // this will restore key "1" as WriteBatch applies updates in order entries.add(new KeyValue<>("1".getBytes(UTF_8), "restored".getBytes(UTF_8))); - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); context.restore(rocksDBStore.name(), entries); final KeyValueIterator iterator = rocksDBStore.all(); @@ -322,7 +324,7 @@ public class RocksDBStoreTest { public void shouldRestoreThenDeleteOnRestoreAll() { final List> entries = getKeyValueEntries(); - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); context.restore(rocksDBStore.name(), entries); @@ -362,7 +364,7 @@ public class RocksDBStoreTest { @Test public void shouldThrowNullPointerExceptionOnNullPut() { - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); try { rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")); fail("Should have thrown NullPointerException on null put()"); @@ -373,7 +375,7 @@ public class RocksDBStoreTest { @Test public void shouldThrowNullPointerExceptionOnNullPutAll() { - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); try { rocksDBStore.put(null, stringSerializer.serialize(null, "someVal")); fail("Should have thrown NullPointerException on null put()"); @@ -384,7 +386,7 @@ public class RocksDBStoreTest { @Test public void shouldThrowNullPointerExceptionOnNullGet() { - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); try { rocksDBStore.get(null); fail("Should have thrown NullPointerException on null get()"); @@ -395,7 +397,7 @@ public class RocksDBStoreTest { @Test public void shouldThrowNullPointerExceptionOnDelete() { - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); try { rocksDBStore.delete(null); fail("Should have thrown NullPointerException on deleting null key"); @@ -406,7 +408,7 @@ public class RocksDBStoreTest { @Test public void shouldThrowNullPointerExceptionOnRange() { - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); try { rocksDBStore.range(null, new Bytes(stringSerializer.serialize(null, "2"))); fail("Should have thrown NullPointerException on deleting null key"); @@ -417,7 +419,7 @@ public class RocksDBStoreTest { @Test(expected = ProcessorStateException.class) public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException { - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); Utils.delete(dir); rocksDBStore.put( new Bytes(stringSerializer.serialize(null, "anyKey")), @@ -438,7 +440,7 @@ public class RocksDBStoreTest { new StreamsConfig(props)); enableBloomFilters = false; - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); final List expectedValues = new ArrayList<>(); expectedValues.add("a"); @@ -463,7 +465,7 @@ public class RocksDBStoreTest { // reopen with Bloom Filters enabled // should open fine without errors enableBloomFilters = true; - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); for (final KeyValue keyValue : keyValues) { final byte[] valBytes = rocksDBStore.get(new Bytes(keyValue.key)); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java index f49527b0d..d65f7174f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java @@ -53,7 +53,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { LogCaptureAppender.setClassLoggerToDebug(RocksDBTimestampedStore.class); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode")); LogCaptureAppender.unregister(appender); @@ -251,7 +251,7 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { // check that still in upgrade mode LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in upgrade mode")); LogCaptureAppender.unregister(appender); rocksDBStore.close(); @@ -270,14 +270,14 @@ public class RocksDBTimestampedStoreTest extends RocksDBStoreTest { // check that still in regular mode appender = LogCaptureAppender.createAndRegister(); - rocksDBStore.init(context, rocksDBStore); + rocksDBStore.init(context, rocksDBStore, config); assertThat(appender.getMessages(), hasItem("Opening store " + DB_NAME + " in regular mode")); LogCaptureAppender.unregister(appender); } private void prepareOldStore() { final RocksDBStore keyValueStore = new RocksDBStore(DB_NAME); - keyValueStore.init(context, keyValueStore); + keyValueStore.init(context, keyValueStore, config); keyValueStore.put(new Bytes("key1".getBytes()), "1".getBytes()); keyValueStore.put(new Bytes("key2".getBytes()), "22".getBytes()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 42b1b8c37..04acad290 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -122,7 +122,7 @@ public class RocksDBWindowStoreTest { Serdes.Integer(), Serdes.String()).build(); - store.init(context, store); + store.init(context, store, null); return store; } @@ -1311,7 +1311,7 @@ public class RocksDBWindowStoreTest { Serdes.String(), Serdes.String()).build(); - windowStore.init(context, windowStore); + windowStore.init(context, windowStore, null); windowStore.put("a", "0001", 0); windowStore.put("aa", "0002", 0); @@ -1375,7 +1375,7 @@ public class RocksDBWindowStoreTest { Serdes.Bytes(), Serdes.String()).build(); - windowStore.init(context, windowStore); + windowStore.init(context, windowStore, null); final Bytes key1 = Bytes.wrap(new byte[] {0}); final Bytes key2 = Bytes.wrap(new byte[] {0, 0}); diff --git a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java index a5e345efc..692d31602 100644 --- a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.test; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; @@ -62,7 +63,7 @@ public class GenericInMemoryKeyValueStore @SuppressWarnings("unchecked") /* This is a "dummy" store used for testing; it does not support restoring from changelog since we allow it to be serde-ignorant */ - public void init(final ProcessorContext context, final StateStore root) { + public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) { if (root != null) { context.register(root, null); } diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index b83936b8d..fd0d28021 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -111,7 +111,7 @@ public class KStreamTestDriver extends ExternalResource { private void initTopology(final ProcessorTopology topology, final List stores) { for (final StateStore store : stores) { try { - store.init(context, store); + store.init(context, store, null); } catch (final RuntimeException e) { new RuntimeException("Fatal exception initializing store.", e).printStackTrace(); throw e; diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java index 1729b2433..42df252d6 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java +++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueStore.java @@ -18,6 +18,7 @@ package org.apache.kafka.test; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; @@ -50,7 +51,8 @@ public class MockKeyValueStore implements KeyValueStore { @Override public void init(final ProcessorContext context, - final StateStore root) { + final StateStore root, + final StreamsConfig config) { context.register(root, stateRestoreCallback); initialized = true; closed = false; diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java index 08945d504..403c70513 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; @@ -73,7 +74,7 @@ public class NoOpReadOnlyStore } @Override - public void init(final ProcessorContext context, final StateStore root) { + public void init(final ProcessorContext context, final StateStore root, final StreamsConfig config) { if (rocksdbStore) { // cf. RocksDBStore new File(context.stateDir() + File.separator + "rocksdb" + File.separator + name).mkdirs(); diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java index 39ff61475..83687081a 100644 --- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java +++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java @@ -17,6 +17,7 @@ package org.apache.kafka.test; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; @@ -97,7 +98,7 @@ public class ReadOnlySessionStoreStub implements ReadOnlySessionStore extends ReadOnlyKeyValueStoreFacade @Override public void init(final ProcessorContext context, - final StateStore root) { - inner.init(context, root); + final StateStore root, + final StreamsConfig config) { + inner.init(context, root, config); } @Override diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java index f6f8f3361..9ac646d77 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/WindowStoreFacade.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.TimestampedWindowStore; @@ -32,8 +33,9 @@ public class WindowStoreFacade extends ReadOnlyWindowStoreFacade imp @Override public void init(final ProcessorContext context, - final StateStore root) { - inner.init(context, root); + final StateStore root, + final StreamsConfig config) { + inner.init(context, root, config); } @Override