diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java index b109588cdd..6297e2c295 100644 --- a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java @@ -138,7 +138,7 @@ public class PersistentDiskCache extends AbstractPersistentCache { Buffer bufferCopy = buffer.duplicate(); Runnable task = () -> { - if (lockSegmentWrite(segmentId)) { + if (writesPending.add(segmentId)) { try (FileChannel channel = new FileOutputStream(tempSegmentFile).getChannel()) { int fileSize = bufferCopy.write(channel); try { @@ -156,7 +156,7 @@ public class PersistentDiskCache extends AbstractPersistentCache { logger.error("Error while deleting corrupted segment file {}", segmentId, i); } } finally { - unlockSegmentWrite(segmentId); + writesPending.remove(segmentId); } } cleanUp(); diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java index 706f3d7c6a..38f2e30f3a 100644 --- a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java @@ -165,7 +165,7 @@ public class PersistentRedisCache extends AbstractPersistentCache { Buffer bufferCopy = buffer.duplicate(); Runnable task = () -> { - if (lockSegmentWrite(segmentId)) { + if (writesPending.add(segmentId)) { final ByteArrayOutputStream bos = new ByteArrayOutputStream(); try (WritableByteChannel channel = Channels.newChannel(bos); Jedis redis = redisPool.getResource()) { while (bufferCopy.hasRemaining()) { @@ -177,7 +177,7 @@ public class PersistentRedisCache extends AbstractPersistentCache { } catch (Throwable t) { logger.error("Error writing segment {} to cache: {}", segmentId, t); } finally { - unlockSegmentWrite(segmentId); + writesPending.remove(segmentId); } } }; diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java index 4c467f9c25..6da3db64a2 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java @@ -30,8 +30,10 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.util.HashSet; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -45,13 +47,13 @@ public abstract class AbstractPersistentCache implements PersistentCache, Closea protected ExecutorService executor; protected AtomicLong cacheSize = new AtomicLong(0); protected PersistentCache nextCache; - protected final HashSet writesPending; + protected final Set writesPending; protected SegmentCacheStats segmentCacheStats; public AbstractPersistentCache() { executor = Executors.newFixedThreadPool(THREADS); - writesPending = new HashSet<>(); + writesPending = ConcurrentHashMap.newKeySet(); } public PersistentCache linkWith(AbstractPersistentCache nextCache) { @@ -139,20 +141,6 @@ public abstract class AbstractPersistentCache implements PersistentCache, Closea } public int getWritesPending() { - synchronized (writesPending) { return writesPending.size(); } } - - protected boolean lockSegmentWrite(String segmentId) { - synchronized (writesPending) { - return writesPending.add(segmentId); - } - } - - protected void unlockSegmentWrite(String segmentId) { - synchronized (writesPending) { - writesPending.remove(segmentId); - } - } -}