diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index f0669a4..c0574df 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -31,6 +31,9 @@ import org.apache.kafka.streams.state.StateSerdes; import java.util.List; import java.util.Objects; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; class CachingKeyValueStore extends WrappedStateStore.AbstractStateStore implements KeyValueStore, CachedStateStore { @@ -44,6 +47,7 @@ class CachingKeyValueStore extends WrappedStateStore.AbstractStateStore im private InternalProcessorContext context; private StateSerdes serdes; private Thread streamThread; + private ReadWriteLock lock = new ReentrantReadWriteLock(); CachingKeyValueStore(final KeyValueStore underlying, final Serde keySerde, @@ -108,9 +112,14 @@ class CachingKeyValueStore extends WrappedStateStore.AbstractStateStore im } @Override - public synchronized void flush() { - cache.flush(cacheName); - underlying.flush(); + public void flush() { + lock.writeLock().lock(); + try { + cache.flush(cacheName); + underlying.flush(); + } finally { + lock.writeLock().unlock(); + } } @Override @@ -131,10 +140,21 @@ class CachingKeyValueStore extends WrappedStateStore.AbstractStateStore im } @Override - public synchronized byte[] get(final Bytes key) { + public byte[] get(final Bytes key) { validateStoreOpen(); - Objects.requireNonNull(key); - return getInternal(key); + Lock theLock; + if (Thread.currentThread().equals(streamThread)) { + theLock = lock.writeLock(); + } else { + theLock = lock.readLock(); + } + theLock.lock(); + try { + Objects.requireNonNull(key); + return getInternal(key); + } finally { + theLock.unlock(); + } } private byte[] getInternal(final Bytes key) { @@ -176,50 +196,80 @@ class CachingKeyValueStore extends WrappedStateStore.AbstractStateStore im } @Override - public synchronized long approximateNumEntries() { - validateStoreOpen(); - return underlying.approximateNumEntries(); + public long approximateNumEntries() { + lock.readLock().lock(); + try { + validateStoreOpen(); + return underlying.approximateNumEntries(); + } finally { + lock.readLock().unlock(); + } } @Override public synchronized void put(final Bytes key, final byte[] value) { Objects.requireNonNull(key, "key cannot be null"); - validateStoreOpen(); - putInternal(key, value); + lock.writeLock().lock(); + try { + validateStoreOpen(); + putInternal(key, value); + } finally { + lock.writeLock().unlock(); + } } private synchronized void putInternal(final Bytes rawKey, final byte[] value) { Objects.requireNonNull(rawKey, "key cannot be null"); - cache.put(cacheName, rawKey, new LRUCacheEntry(value, true, context.offset(), + lock.writeLock().lock(); + try { + cache.put(cacheName, rawKey, new LRUCacheEntry(value, true, context.offset(), context.timestamp(), context.partition(), context.topic())); + } finally { + lock.writeLock().unlock(); + } } @Override public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) { Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); - final byte[] v = getInternal(key); - if (v == null) { - putInternal(key, value); + lock.writeLock().lock(); + try { + final byte[] v = getInternal(key); + if (v == null) { + putInternal(key, value); + } + return v; + } finally { + lock.writeLock().unlock(); } - return v; } @Override public synchronized void putAll(final List> entries) { - for (KeyValue entry : entries) { - put(entry.key, entry.value); + lock.writeLock().lock(); + try { + for (KeyValue entry : entries) { + put(entry.key, entry.value); + } + } finally { + lock.writeLock().unlock(); } } @Override public synchronized byte[] delete(final Bytes key) { validateStoreOpen(); - Objects.requireNonNull(key); - final byte[] v = getInternal(key); - cache.delete(cacheName, key); - underlying.delete(key); - return v; + lock.writeLock().lock(); + try { + Objects.requireNonNull(key); + final byte[] v = getInternal(key); + cache.delete(cacheName, key); + underlying.delete(key); + return v; + } finally { + lock.writeLock().unlock(); + } } KeyValueStore underlying() {