diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java b/jmh-benchmarks/src/main/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java new file mode 100644 index 0000000..62661d2 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import java.util.Collections; +import org.apache.kafka.common.metrics.Metrics; + +public class MockStreamsMetrics extends StreamsMetricsImpl { + + public MockStreamsMetrics(Metrics metrics) { + super(metrics, "mock-stream-metrics", + Collections.emptyMap()); + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreBenchmark.java new file mode 100644 index 0000000..47178ef --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreBenchmark.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.test.MockProcessorContext; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * This is a simple example of a JMH benchmark. + * + * The sample code provided by the JMH project is a great place to start learning how to write correct benchmarks: + * http://hg.openjdk.java.net/code-tools/jmh/file/tip/jmh-samples/src/main/java/org/openjdk/jmh/samples/ + */ +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class CachingKeyValueStoreBenchmark { + + private static final int DISTINCT_KEYS = 10_000; + + private static final String KEY = "the_key_to_use"; + + private static final String VALUE = "the quick brown fox jumped over the lazy dog the olympics are about to start"; + + private final String[] keys = new String[DISTINCT_KEYS]; + + private final String[] values = new String[DISTINCT_KEYS]; + + private InMemoryKeyValueStore underlyingStore; + private CachingKeyValueStore kvStore; + private MockProcessorContext context; + private ThreadCache cache; + + int counter; + + @Setup(Level.Trial) + public void setUp() { + for (int i = 0; i < DISTINCT_KEYS; ++i) { + keys[i] = KEY + i; + values[i] = VALUE + i; + } + final String storeName = "store"; + underlyingStore = new InMemoryKeyValueStore<>(storeName, Serdes.Bytes(), Serdes.ByteArray()); + kvStore = new CachingKeyValueStore(underlyingStore, Serdes.String(), Serdes.String()); + cache = new ThreadCache(new LogContext("testCache "), 32000, new MockStreamsMetrics(new Metrics())); + context = new MockProcessorContext((File)null, null, null, (RecordCollector) null, cache); + context.setRecordContext(new ProcessorRecordContext(10, 0, 0, "topic")); + kvStore.init(context, null); + } + + private byte[] bytesValue(final String value) { + return value.getBytes(); + } + + private Bytes bytesKey(final String key) { + return Bytes.wrap(key.getBytes()); + } + @Benchmark + public String testCachePerformance() { + counter++; + int index = counter % DISTINCT_KEYS; + String hashkey = keys[index]; + kvStore.put(bytesKey(hashkey), bytesValue(values[index])); + return new Bytes(kvStore.get(bytesKey(hashkey))).toString(); + } + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(CachingKeyValueStoreBenchmark.class.getSimpleName()) + .forks(2) + .build(); + + new Runner(opt).run(); + } + +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/test/MockProcessorContext.java b/jmh-benchmarks/src/main/java/org/apache/kafka/test/MockProcessorContext.java new file mode 100644 index 0000000..ce6cca8 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/test/MockProcessorContext.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateRestoreListener; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; +import org.apache.kafka.streams.processor.internals.CompositeRestoreListener; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; +import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.streams.state.internals.ThreadCache; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class MockProcessorContext extends AbstractProcessorContext implements RecordCollector.Supplier { + + private final File stateDir; + private final Metrics metrics; + private final Serde keySerde; + private final Serde valSerde; + private final RecordCollector.Supplier recordCollectorSupplier; + private final Map storeMap = new LinkedHashMap<>(); + private final Map restoreFuncs = new HashMap<>(); + + private long timestamp = -1L; + + public MockProcessorContext(final File stateDir, + final StreamsConfig config) { + this(stateDir, null, null, new Metrics(), config, null, null); + } + + public MockProcessorContext(final StateSerdes serdes, + final RecordCollector collector) { + this(null, serdes.keySerde(), serdes.valueSerde(), collector, null); + } + + public MockProcessorContext(final StateSerdes serdes, + final RecordCollector collector, + final Metrics metrics) { + this(null, serdes.keySerde(), serdes.valueSerde(), metrics, new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() { + @Override + public RecordCollector recordCollector() { + return collector; + } + }, null); + } + + public MockProcessorContext(final File stateDir, + final Serde keySerde, + final Serde valSerde, + final RecordCollector collector, + final ThreadCache cache) { + this(stateDir, keySerde, valSerde, new Metrics(), new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() { + @Override + public RecordCollector recordCollector() { + return collector; + } + }, cache); + } + + private MockProcessorContext(final File stateDir, + final Serde keySerde, + final Serde valSerde, + final Metrics metrics, + final StreamsConfig config, + final RecordCollector.Supplier collectorSupplier, + final ThreadCache cache) { + super(new TaskId(0, 0), + config, + new MockStreamsMetrics(metrics), + null, + cache); + this.stateDir = stateDir; + this.keySerde = keySerde; + this.valSerde = valSerde; + this.metrics = metrics; + this.recordCollectorSupplier = collectorSupplier; + } + + @Override + public RecordCollector recordCollector() { + final RecordCollector recordCollector = recordCollectorSupplier.recordCollector(); + + if (recordCollector == null) { + throw new UnsupportedOperationException("No RecordCollector specified"); + } + return recordCollector; + } + + // serdes will override whatever specified in the configs + @Override + public Serde keySerde() { + return keySerde; + } + + @Override + public Serde valueSerde() { + return valSerde; + } + + // state mgr will be overridden by the state dir and store maps + @Override + public void initialized() {} + + @Override + public File stateDir() { + if (stateDir == null) { + throw new UnsupportedOperationException("State directory not specified"); + } + + return stateDir; + } + + @Override + public void register(final StateStore store, + final boolean deprecatedAndIgnoredLoggingEnabled, + final StateRestoreCallback func) { + storeMap.put(store.name(), store); + restoreFuncs.put(store.name(), func); + } + + @Override + public StateStore getStateStore(final String name) { + return storeMap.get(name); + } + + @Override public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) { + throw new UnsupportedOperationException("schedule() not supported."); + } + + @Override + public void schedule(final long interval) { } + + @Override + public void commit() { } + + @Override + @SuppressWarnings("unchecked") + public void forward(final K key, final V value) { + final ProcessorNode thisNode = currentNode; + for (final ProcessorNode childNode : (List>) thisNode.children()) { + currentNode = childNode; + try { + childNode.process(key, value); + } finally { + currentNode = thisNode; + } + } + } + + @Override + @SuppressWarnings("unchecked") + public void forward(final K key, final V value, final int childIndex) { + final ProcessorNode thisNode = currentNode; + final ProcessorNode childNode = (ProcessorNode) thisNode.children().get(childIndex); + currentNode = childNode; + try { + childNode.process(key, value); + } finally { + currentNode = thisNode; + } + } + + @Override + @SuppressWarnings("unchecked") + public void forward(final K key, final V value, final String childName) { + final ProcessorNode thisNode = currentNode; + for (final ProcessorNode childNode : (List>) thisNode.children()) { + if (childNode.name().equals(childName)) { + currentNode = childNode; + try { + childNode.process(key, value); + } finally { + currentNode = thisNode; + } + break; + } + } + } + + // allow only setting time but not other fields in for record context, + // and also not throwing exceptions if record context is not available. + public void setTime(final long timestamp) { + if (recordContext != null) { + recordContext = new ProcessorRecordContext(timestamp, recordContext.offset(), recordContext.partition(), recordContext.topic()); + } + this.timestamp = timestamp; + } + + @Override + public long timestamp() { + if (recordContext == null) { + return timestamp; + } + return recordContext.timestamp(); + } + + @Override + public String topic() { + if (recordContext == null) { + return null; + } + return recordContext.topic(); + } + + @Override + public int partition() { + if (recordContext == null) { + return -1; + } + return recordContext.partition(); + } + + @Override + public long offset() { + if (recordContext == null) { + return -1L; + } + return recordContext.offset(); + } + + Map allStateStores() { + return Collections.unmodifiableMap(storeMap); + } + + public void restore(final String storeName, final Iterable> changeLog) { + + final BatchingStateRestoreCallback restoreCallback = getBatchingRestoreCallback(restoreFuncs.get(storeName)); + final StateRestoreListener restoreListener = getStateRestoreListener(restoreCallback); + + restoreListener.onRestoreStart(null, storeName, 0L, 0L); + + List> records = new ArrayList<>(); + for (KeyValue keyValue : changeLog) { + records.add(keyValue); + } + + restoreCallback.restoreAll(records); + + restoreListener.onRestoreEnd(null, storeName, 0L); + } + + public void close() { + metrics.close(); + } + + private StateRestoreListener getStateRestoreListener(StateRestoreCallback restoreCallback) { + if (restoreCallback instanceof StateRestoreListener) { + return (StateRestoreListener) restoreCallback; + } + + return CompositeRestoreListener.NO_OP_STATE_RESTORE_LISTENER; + } + + private BatchingStateRestoreCallback getBatchingRestoreCallback(StateRestoreCallback restoreCallback) { + if (restoreCallback instanceof BatchingStateRestoreCallback) { + return (BatchingStateRestoreCallback) restoreCallback; + } + + return new WrappedBatchingStateRestoreCallback(restoreCallback); + } + +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/test/StreamsTestUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/test/StreamsTestUtils.java new file mode 100644 index 0000000..f510e5b --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/test/StreamsTestUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Windowed; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +public class StreamsTestUtils { + + public static Properties getStreamsConfig(final String applicationId, + final String bootstrapServers, + final String keySerdeClassName, + final String valueSerdeClassName, + final Properties additional) { + + Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + streamsConfiguration.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000"); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdeClassName); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + streamsConfiguration.putAll(additional); + return streamsConfiguration; + + } + + public static Properties minimalStreamsConfig() { + final Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString()); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "anyserver:9092"); + return properties; + } + + public static List> toList(final Iterator> iterator) { + final List> results = new ArrayList<>(); + + while (iterator.hasNext()) { + results.add(iterator.next()); + } + return results; + } + + public static void verifyKeyValueList(final List> expected, final List> actual) { + for (int i = 0; i < actual.size(); i++) { + final KeyValue expectedKv = expected.get(i); + final KeyValue actualKv = actual.get(i); + } + } + + public static void verifyWindowedKeyValue(final KeyValue, byte[]> actual, + final Windowed expectedKey, + final String expectedValue) { + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/test/TestUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/test/TestUtils.java new file mode 100644 index 0000000..4bdf833 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/test/TestUtils.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Random; +import java.util.Set; + +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper functions for writing unit tests + */ +public class TestUtils { + private static final Logger log = LoggerFactory.getLogger(TestUtils.class); + + public static final File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir")); + + public static final String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + public static final String DIGITS = "0123456789"; + public static final String LETTERS_AND_DIGITS = LETTERS + DIGITS; + + public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets"; + public static final Set INTERNAL_TOPICS = Collections.singleton(GROUP_METADATA_TOPIC_NAME); + + /* A consistent random number generator to make tests repeatable */ + public static final Random SEEDED_RANDOM = new Random(192348092834L); + public static final Random RANDOM = new Random(); + public static final long DEFAULT_MAX_WAIT_MS = 15000; + + /** + * Create a temporary relative directory in the default temporary-file directory with the given prefix. + * + * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix + */ + public static File tempDirectory(final String prefix) { + return tempDirectory(null, prefix); + } + + /** + * Create a temporary relative directory in the default temporary-file directory with a + * prefix of "kafka-" + * + * @return the temporary directory just created. + */ + public static File tempDirectory() { + return tempDirectory(null); + } + + /** + * Create a temporary relative directory in the specified parent directory with the given prefix. + * + * @param parent The parent folder path name, if null using the default temporary-file directory + * @param prefix The prefix of the temporary directory, if null using "kafka-" as default prefix + */ + public static File tempDirectory(final Path parent, String prefix) { + final File file; + prefix = prefix == null ? "kafka-" : prefix; + try { + file = parent == null ? + Files.createTempDirectory(prefix).toFile() : Files.createTempDirectory(parent, prefix).toFile(); + } catch (final IOException ex) { + throw new RuntimeException("Failed to create a temp dir", ex); + } + file.deleteOnExit(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + Utils.delete(file); + } catch (IOException e) { + log.error("Error deleting {}", file.getAbsolutePath(), e); + } + } + }); + + return file; + } +}