diff --git a/oak-run/pom.xml b/oak-run/pom.xml
index c103845c03..ad38a9cd1f 100644
--- a/oak-run/pom.xml
+++ b/oak-run/pom.xml
@@ -35,6 +35,7 @@
2.4.17
- 54000000
+ 57671680
diff --git a/oak-segment-remote/pom.xml b/oak-segment-remote/pom.xml
index a705e401c4..793de6b125 100644
--- a/oak-segment-remote/pom.xml
+++ b/oak-segment-remote/pom.xml
@@ -39,11 +39,18 @@
${guava.osgi.import},
org.apache.jackrabbit.oak.segment.spi*,
!org.apache.jackrabbit.oak.segment*,
+ !net.sf.cglib.asm.util,
+ !org.apache.tools.ant,
+ !org.apache.tools.ant.types,
*
- org.apache.jackrabbit.oak.segment.remote*
+ org.apache.jackrabbit.oak.segment.remote
+
+ org.apache.servicemix.bundles.*,
+ commons-pool2
+
@@ -108,6 +115,23 @@
${project.version}
provided
+
+
+
+ org.apache.servicemix.bundles
+ org.apache.servicemix.bundles.jedis
+ 3.3.0_1
+
+
+ org.apache.servicemix.bundles
+ org.apache.servicemix.bundles.cglib
+ 3.1_1
+
+
+ org.apache.commons
+ commons-pool2
+ 2.6.2
+
@@ -120,5 +144,10 @@
junit
test
+
+ com.github.kstyrc
+ embedded-redis
+ 0.6
+
diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java
new file mode 100644
index 0000000000..03a5d757af
--- /dev/null
+++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import static org.apache.jackrabbit.oak.segment.remote.persistentcache.Configuration.PID;
+import static org.apache.jackrabbit.oak.segment.remote.persistentcache.PersistentDiskCache.DEFAULT_MAX_CACHE_SIZE_MB;
+import static org.apache.jackrabbit.oak.segment.remote.persistentcache.PersistentRedisCache.DEFAULT_REDIS_CACHE_EXPIRE_SECONDS;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(
+ pid = {PID},
+ name = "Apache Jackrabbit Oak Remote Persistent Cache Service",
+ description = "Persistent cache for the Oak Segment Node Store")
+public @interface Configuration {
+
+ String PID = "org.apache.jackrabbit.oak.segment.remote.RemotePersistentCacheService";
+
+ @AttributeDefinition(
+ name = "Disk cache persistence",
+ description = "Boolean value indicating that the local disk persisted cache should be used for segment store"
+ )
+ boolean diskCacheEnabled() default false;
+
+ @AttributeDefinition(
+ name = "Disk cache persistence directory",
+ description = "Path on the file system where disk cache persistence data will be stored."
+ )
+ String diskCacheDirectory() default "";
+
+ @AttributeDefinition(
+ name = "Disk cache persistence maximum size",
+ description = "Disk cache size (in MB). Default value is " + DEFAULT_MAX_CACHE_SIZE_MB
+ )
+ int diskCacheMaxSizeMB() default DEFAULT_MAX_CACHE_SIZE_MB;
+
+ @AttributeDefinition(
+ name = "Redis cache persistence",
+ description = "Boolean value indicating that the redis persisted cache should be used for segment store"
+ )
+ boolean redisCacheEnabled() default false;
+
+ @AttributeDefinition(
+ name = "Redis cache persistence host",
+ description = "Remote redis server host"
+ )
+ String redisCacheHost() default "";
+
+ @AttributeDefinition(
+ name = "Redis cache persistence port",
+ description = "Remote redis server port (0 = default)"
+ )
+ int redisCachePort() default 0;
+
+ @AttributeDefinition(
+ name = "Redis cache persistence entries expiry interval",
+ description = "Number of seconds to keep the entries in the cache. Default value is " + DEFAULT_REDIS_CACHE_EXPIRE_SECONDS
+ )
+ int redisCacheExpireSeconds() default DEFAULT_REDIS_CACHE_EXPIRE_SECONDS;
+
+ @AttributeDefinition(
+ name = "Redis cache db index",
+ description = "Redis cache db index (see Jedis#select(int))"
+ )
+ int redisDBIndex() default 1;
+
+ @AttributeDefinition(
+ name = "Redis socket timeout",
+ description = "Number of seconds to wait for response for request"
+ )
+ int redisSocketTimeout() default 10;
+
+ @AttributeDefinition(
+ name = "Redis connection timeout",
+ description = "Number of seconds to wait for redis connection to be established"
+ )
+ int redisConnectionTimeout() default 50;
+
+ @AttributeDefinition(
+ name = "Redis Minimum Connections",
+ description = "Minimum number of established connections that need to be kept in the pool"
+ )
+ int redisMinConnections() default 10;
+
+ @AttributeDefinition(
+ name = "Redis Maximum Connections",
+ description = "Maximum number of connections required by the business"
+ )
+ int redisMaxConnections() default 100;
+
+ @AttributeDefinition(
+ name = "Redis Maximum Total Connections",
+ description = "Includes the number of idle connections as a surplus"
+ )
+ int redisMaxTotalConnections() default 200;
+}
diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java
new file mode 100644
index 0000000000..43a86814e0
--- /dev/null
+++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.File;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * This {@code IOMonitor} implementations registers the following monitoring endpoints
+ * with the Metrics library if available:
+ *
+ * - {@link #OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_BYTES}:
+ * a meter metrics for the number of bytes read from segment disk cache
+ * - {@link #OAK_SEGMENT_CACHE_DISK_SEGMENT_WRITE_BYTES}:
+ * a meter metrics for the number of bytes written to segment disk cache
+ * - {@link #OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_TIME}:
+ * a timer metrics for the time spent reading from segment disk cache
+ * - {@link #OAK_SEGMENT_CACHE_DISk_SEGMENT_WRITE_TIME}:
+ * a timer metrics for the time spent writing to segment disk cache
+ *
+ */
+public class DiskCacheIOMonitor extends IOMonitorAdapter {
+ public static final String OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_BYTES = "oak.segment.cache.disk.segment-read-bytes";
+ public static final String OAK_SEGMENT_CACHE_DISK_SEGMENT_WRITE_BYTES = "oak.segment.cache.disk.segment-write-bytes";
+ public static final String OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_TIME = "oak.segment.cache.disk.segment-read-time";
+ public static final String OAK_SEGMENT_CACHE_DISk_SEGMENT_WRITE_TIME = "oak.segment.cache.disk.segment-write-time";
+
+ private final MeterStats segmentReadBytes;
+ private final MeterStats segmentWriteBytes;
+ private final TimerStats segmentReadTime;
+ private final TimerStats segmentWriteTime;
+
+ public DiskCacheIOMonitor(@NotNull StatisticsProvider statisticsProvider) {
+ segmentReadBytes = statisticsProvider.getMeter(
+ OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_BYTES, StatsOptions.METRICS_ONLY);
+ segmentWriteBytes = statisticsProvider.getMeter(
+ OAK_SEGMENT_CACHE_DISK_SEGMENT_WRITE_BYTES, StatsOptions.METRICS_ONLY);
+ segmentReadTime = statisticsProvider.getTimer(
+ OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_TIME, StatsOptions.METRICS_ONLY);
+ segmentWriteTime = statisticsProvider.getTimer(
+ OAK_SEGMENT_CACHE_DISk_SEGMENT_WRITE_TIME, StatsOptions.METRICS_ONLY);
+ }
+
+ @Override
+ public void afterSegmentRead(File file, long msb, long lsb, int length, long elapsed) {
+ segmentReadBytes.mark(length);
+ segmentReadTime.update(elapsed, NANOSECONDS);
+ }
+
+ @Override
+ public void afterSegmentWrite(File file, long msb, long lsb, int length, long elapsed) {
+ segmentWriteBytes.mark(length);
+ segmentWriteTime.update(elapsed, NANOSECONDS);
+ }
+}
diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
new file mode 100644
index 0000000000..81c3ed5472
--- /dev/null
+++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import com.google.common.base.Stopwatch;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.SegmentCacheStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileTime;
+import java.util.Comparator;
+import java.util.Spliterator;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+public class PersistentDiskCache extends AbstractPersistentCache {
+ private static final Logger logger = LoggerFactory.getLogger(PersistentDiskCache.class);
+ public static final int DEFAULT_MAX_CACHE_SIZE_MB = 512;
+ public static final String NAME = "Segment Disk Cache";
+
+ private final File directory;
+ private final long maxCacheSizeBytes;
+ private final IOMonitor diskCacheIOMonitor;
+
+ final AtomicBoolean cleanupInProgress = new AtomicBoolean(false);
+
+ final AtomicLong evictionCount = new AtomicLong();
+
+ private static final Comparator sortedByAccessTime = (path1, path2) -> {
+ try {
+ FileTime lastAccessFile1 = Files.readAttributes(path1, BasicFileAttributes.class).lastAccessTime();
+ FileTime lastAccessFile2 = Files.readAttributes(path2, BasicFileAttributes.class).lastAccessTime();
+ return lastAccessFile1.compareTo(lastAccessFile2);
+ } catch (IOException e) {
+ logger.error("A problem occurred while cleaning up the cache: ", e);
+ }
+ return 0;
+ };
+
+ public PersistentDiskCache(File directory, int cacheMaxSizeMB, IOMonitor diskCacheIOMonitor) {
+ this.directory = directory;
+ this.maxCacheSizeBytes = cacheMaxSizeMB * 1024L * 1024L;
+ this.diskCacheIOMonitor = diskCacheIOMonitor;
+ if (!directory.exists()) {
+ directory.mkdirs();
+ }
+
+ segmentCacheStats = new SegmentCacheStats(
+ NAME,
+ () -> maxCacheSizeBytes,
+ () -> Long.valueOf(directory.listFiles().length),
+ () -> FileUtils.sizeOfDirectory(directory),
+ () -> evictionCount.get());
+ }
+
+ @Override
+ protected Buffer readSegmentInternal(long msb, long lsb) {
+ try {
+ String segmentId = new UUID(msb, lsb).toString();
+ File segmentFile = new File(directory, segmentId);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ if (segmentFile.exists()) {
+ diskCacheIOMonitor.beforeSegmentRead(segmentFile, msb, lsb, (int) segmentFile.length());
+ try (FileInputStream fis = new FileInputStream(segmentFile); FileChannel channel = fis.getChannel()) {
+ int length = (int) channel.size();
+
+ Buffer buffer = Buffer.allocateDirect(length);
+ if (buffer.readFully(channel, 0) < length) {
+ throw new EOFException();
+ }
+
+ long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ diskCacheIOMonitor.afterSegmentRead(segmentFile, msb, lsb, (int) segmentFile.length(), elapsed);
+
+ buffer.flip();
+
+ return buffer;
+ } catch (FileNotFoundException e) {
+ logger.info("Segment {} deleted from file system!", segmentId);
+ } catch (IOException e) {
+ logger.error("Error loading segment {} from cache:", segmentId, e);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Exception while reading segment {} from the cache:", new UUID(msb, lsb), e);
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ return new File(directory, new UUID(msb, lsb).toString()).exists();
+ }
+
+ @Override
+ public void writeSegment(long msb, long lsb, Buffer buffer) {
+ String segmentId = new UUID(msb, lsb).toString();
+ File segmentFile = new File(directory, segmentId);
+ Buffer bufferCopy = buffer.duplicate();
+
+ Runnable task = () -> {
+ if (lockSegmentWrite(segmentId)) {
+ try (FileChannel channel = new FileOutputStream(segmentFile).getChannel()) {
+ int fileSize = bufferCopy.write(channel);
+ cacheSize.addAndGet(fileSize);
+ } catch (Throwable t) {
+ logger.error("Error writing segment {} to cache: {}", segmentId, t);
+ } finally {
+ unlockSegmentWrite(segmentId);
+ }
+ }
+ cleanUp();
+ };
+
+ executor.execute(task);
+
+ if (nextCache != null) {
+ nextCache.writeSegment(msb, lsb, buffer);
+ }
+ }
+
+ private boolean isCacheFull() {
+ return cacheSize.get() >= maxCacheSizeBytes;
+ }
+
+ @Override
+ public void cleanUp() {
+ if (!cleanupInProgress.getAndSet(true)) {
+ try {
+ cleanUpInternal();
+ } finally {
+ cleanupInProgress.set(false);
+ }
+ }
+ }
+
+ private void cleanUpInternal() {
+ if (isCacheFull()) {
+ try {
+ Stream segmentsPaths = Files.walk(directory.toPath())
+ .sorted(sortedByAccessTime)
+ .filter(filePath -> !filePath.toFile().isDirectory());
+
+ StreamConsumer.forEach(segmentsPaths, (path, breaker) -> {
+
+ if (cacheSize.get() > maxCacheSizeBytes * 0.66) {
+ cacheSize.addAndGet(-path.toFile().length());
+ path.toFile().delete();
+ evictionCount.incrementAndGet();
+ } else {
+ breaker.stop();
+ }
+ });
+ } catch (IOException e) {
+ logger.error("A problem occurred while cleaning up the cache: ", e);
+ }
+ }
+ }
+
+ static class StreamConsumer {
+
+ public static class Breaker {
+ private boolean shouldBreak = false;
+
+ public void stop() {
+ shouldBreak = true;
+ }
+
+ boolean get() {
+ return shouldBreak;
+ }
+ }
+
+ public static void forEach(Stream stream, BiConsumer consumer) {
+ Spliterator spliterator = stream.spliterator();
+ boolean hadNext = true;
+ Breaker breaker = new Breaker();
+
+ while (hadNext && !breaker.get()) {
+ hadNext = spliterator.tryAdvance(elem -> {
+ consumer.accept(elem, breaker);
+ });
+ }
+ }
+ }
+}
diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
new file mode 100644
index 0000000000..706f3d7c6a
--- /dev/null
+++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import com.google.common.base.Stopwatch;
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.SegmentCacheStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.exceptions.JedisException;
+import redis.clients.jedis.params.SetParams;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+public class PersistentRedisCache extends AbstractPersistentCache {
+ private static final Logger logger = LoggerFactory.getLogger(PersistentRedisCache.class);
+ public static final int DEFAULT_REDIS_CACHE_EXPIRE_SECONDS = 3600 * 24 * 2;
+ public static final String NAME = "Segment Redis Cache";
+
+ private static final String REDIS_PREFIX = "SEGMENT";
+ private final IOMonitor redisCacheIOMonitor;
+ private JedisPool redisPool;
+ private SetParams setParamsWithExpire;
+
+ public PersistentRedisCache(String redisHost, int redisPort, int redisExpireSeconds, int redisSocketTimeout,
+ int redisConnectionTimeout, int redisMinConnections, int redisMaxConnections, int redisMaxTotalConnections,
+ int redisDBIndex, IOMonitor redisCacheIOMonitor) {
+ this.redisCacheIOMonitor = redisCacheIOMonitor;
+ int redisExpireSeconds1 = redisExpireSeconds < 0 ? DEFAULT_REDIS_CACHE_EXPIRE_SECONDS : redisExpireSeconds;
+ setParamsWithExpire = SetParams.setParams().ex(redisExpireSeconds1);
+
+ if (redisPort == 0) {
+ redisPort = 6379;
+ }
+
+ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+ jedisPoolConfig.setTestOnBorrow(true);
+ jedisPoolConfig.setMaxWaitMillis(redisSocketTimeout);
+ jedisPoolConfig.setMinIdle(redisMinConnections);
+ jedisPoolConfig.setMaxIdle(redisMaxConnections);
+ jedisPoolConfig.setMaxTotal(redisMaxTotalConnections);
+
+ this.redisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, redisConnectionTimeout,
+ redisSocketTimeout, null, redisDBIndex, null);
+ this.segmentCacheStats = new SegmentCacheStats(NAME, this::getRedisMaxMemory, this::getCacheElementCount,
+ this::getCurrentWeight, this::getNumberOfEvictedKeys);
+ }
+
+ private long getCacheElementCount() {
+ try(Jedis redis = redisPool.getResource()) {
+ return redis.dbSize();
+ } catch (JedisException e) {
+ logger.error("Error getting number of elements in redis", e);
+ }
+
+ return -1;
+ }
+
+ private long getRedisMaxMemory() {
+ try{
+ return Long.parseLong(getRedisProperty("memory", "maxmemory"));
+ } catch (JedisException | IOException e) {
+ logger.error("Error getting redis configuration value for 'maxmemory'", e);
+ }
+ return -1;
+ }
+
+ private long getCurrentWeight() {
+ try{
+ return Long.parseLong(getRedisProperty("memory", "used_memory"));
+ } catch (JedisException | IOException e) {
+ logger.error("Error getting number of elements in redis", e);
+ }
+ return -1;
+ }
+
+ private long getNumberOfEvictedKeys() {
+ try{
+ return Long.parseLong(getRedisProperty("stats", "evicted_keys"));
+ } catch (JedisException | IOException e) {
+ logger.error("Error getting number of evicted elements in redis", e);
+ }
+ return -1;
+ }
+
+ private String getRedisProperty(String section, String propertyName) throws IOException {
+ try(Jedis redis = redisPool.getResource()) {
+ String redisInfoString = redis.info(section);
+ Properties props = new Properties();
+ props.load(new StringReader(redisInfoString));
+ return (String) props.get(propertyName);
+ }
+ }
+
+ @Override
+ protected Buffer readSegmentInternal(long msb, long lsb) {
+ String segmentId = new UUID(msb, lsb).toString();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try(Jedis redis = redisPool.getResource()) {
+ redisCacheIOMonitor.beforeSegmentRead(null, msb, lsb, 0);
+
+ final byte[] bytes = redis.get((REDIS_PREFIX + ":" + segmentId).getBytes());
+ if (bytes != null) {
+ long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ redisCacheIOMonitor.afterSegmentRead(null, msb, lsb, bytes.length, elapsed);
+
+ Buffer buffer = Buffer.allocateDirect(bytes.length);
+
+ buffer.put(bytes);
+ buffer.flip();
+ return buffer;
+ }
+ } catch (Exception e) {
+ logger.error("Error loading segment {} from cache", segmentId, e);
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean containsSegment(long msb, long lsb) {
+ String segmentId = new UUID(msb, lsb).toString();
+
+ try (Jedis redis = redisPool.getResource()) {
+ return redis.exists((REDIS_PREFIX + ":" + segmentId).getBytes());
+ } catch (JedisException e) {
+ logger.error("Error checking segment existence {} in cache: {}", segmentId, e);
+ }
+
+ return false;
+ }
+
+ @Override
+ public void writeSegment(long msb, long lsb, Buffer buffer) {
+ String segmentId = new UUID(msb, lsb).toString();
+ Buffer bufferCopy = buffer.duplicate();
+
+ Runnable task = () -> {
+ if (lockSegmentWrite(segmentId)) {
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (WritableByteChannel channel = Channels.newChannel(bos); Jedis redis = redisPool.getResource()) {
+ while (bufferCopy.hasRemaining()) {
+ bufferCopy.write(channel);
+ }
+ final byte[] key = (REDIS_PREFIX + ":" + segmentId).getBytes();
+ redis.set(key, bos.toByteArray(), setParamsWithExpire);
+ cacheSize.addAndGet(bos.size());
+ } catch (Throwable t) {
+ logger.error("Error writing segment {} to cache: {}", segmentId, t);
+ } finally {
+ unlockSegmentWrite(segmentId);
+ }
+ }
+ };
+
+ executor.execute(task);
+
+ if (nextCache != null) {
+ nextCache.writeSegment(msb, lsb, buffer);
+ }
+ }
+
+ @Override
+ public void cleanUp() {
+ // do nothing
+ }
+}
diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java
new file mode 100644
index 0000000000..7e139f230a
--- /dev/null
+++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.File;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+/**
+ * This {@code IOMonitor} implementations registers the following monitoring endpoints
+ * with the Metrics library if available:
+ *
+ * - {@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_BYTES}:
+ * a meter metrics for the number of bytes read from segment redis cache
+ * - {@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_BYTES}:
+ * a meter metrics for the number of bytes written to segment redis cache
+ * - {@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_TIME}:
+ * a timer metrics for the time spent reading from segment redis cache
+ * - {@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_TIME}:
+ * a timer metrics for the time spent writing to segment redis cache
+ *
+ */
+public class RedisCacheIOMonitor extends IOMonitorAdapter {
+ public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_BYTES = "oak.segment.cache.redis.segment-read-bytes";
+ public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_BYTES = "oak.segment.cache.redis.segment-write-bytes";
+ public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_TIME = "oak.segment.cache.redis.segment-read-time";
+ public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_TIME = "oak.segment.cache.redis.segment-write-time";
+
+ private final MeterStats segmentReadBytes;
+ private final MeterStats segmentWriteBytes;
+ private final TimerStats segmentReadTime;
+ private final TimerStats segmentWriteTime;
+
+ public RedisCacheIOMonitor(@NotNull StatisticsProvider statisticsProvider) {
+ segmentReadBytes = statisticsProvider.getMeter(
+ OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_BYTES, StatsOptions.METRICS_ONLY);
+ segmentWriteBytes = statisticsProvider.getMeter(
+ OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_BYTES, StatsOptions.METRICS_ONLY);
+ segmentReadTime = statisticsProvider.getTimer(
+ OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_TIME, StatsOptions.METRICS_ONLY);
+ segmentWriteTime = statisticsProvider.getTimer(
+ OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_TIME, StatsOptions.METRICS_ONLY);
+ }
+
+ @Override
+ public void afterSegmentRead(File file, long msb, long lsb, int length, long elapsed) {
+ segmentReadBytes.mark(length);
+ segmentReadTime.update(elapsed, NANOSECONDS);
+ }
+
+ @Override
+ public void afterSegmentWrite(File file, long msb, long lsb, int length, long elapsed) {
+ segmentWriteBytes.mark(length);
+ segmentWriteTime.update(elapsed, NANOSECONDS);
+ }
+}
diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java
new file mode 100644
index 0000000000..4fd074a9d3
--- /dev/null
+++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
+
+import com.google.common.io.Closer;
+
+import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.segment.spi.monitor.RoleStatisticsProvider;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+@Component(
+ configurationPolicy = ConfigurationPolicy.REQUIRE,
+ configurationPid = {Configuration.PID})
+public class RemotePersistentCacheService {
+ private ServiceRegistration registration;
+
+ private PersistentCache persistentCache;
+
+ private final Closer closer = Closer.create();
+
+ private OsgiWhiteboard osgiWhiteboard;
+
+ @Reference
+ private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
+
+ @Activate
+ public void activate(ComponentContext context, Configuration config) throws IOException {
+ osgiWhiteboard = new OsgiWhiteboard(context.getBundleContext());
+ persistentCache = createPersistentCache(config, closer);
+ registration = context.getBundleContext().registerService(PersistentCache.class.getName(), persistentCache, new Properties());
+ }
+
+ @Deactivate
+ public void deactivate() throws IOException {
+ if (registration != null) {
+ registration.unregister();
+ registration = null;
+ }
+ closeQuietly(closer);
+ persistentCache = null;
+ }
+
+ protected void registerCloseable(final Registration registration) {
+ closer.register(registration::unregister);
+ }
+
+ protected Registration registerMBean(Class clazz, T bean, String type, String name) {
+ return WhiteboardUtils.registerMBean(osgiWhiteboard, clazz, bean, type, name);
+ }
+
+ private PersistentCache createPersistentCache(Configuration configuration, Closer closer) {
+
+ RoleStatisticsProvider roleStatisticsProvider = new RoleStatisticsProvider(statisticsProvider, "remote_persistence");
+
+ DiskCacheIOMonitor diskCacheIOMonitor = new DiskCacheIOMonitor(roleStatisticsProvider);
+ RedisCacheIOMonitor redisCacheIOMonitor = new RedisCacheIOMonitor(roleStatisticsProvider);
+
+ if (configuration.diskCacheEnabled()) {
+ PersistentDiskCache persistentDiskCache = new PersistentDiskCache(new File(configuration.diskCacheDirectory()), configuration.diskCacheMaxSizeMB(), diskCacheIOMonitor);
+ closer.register(persistentDiskCache);
+
+ CacheStatsMBean diskCacheStatsMBean = persistentDiskCache.getCacheStats();
+ registerCloseable(registerMBean(CacheStatsMBean.class, diskCacheStatsMBean, CacheStats.TYPE, diskCacheStatsMBean.getName()));
+
+ if (configuration.redisCacheEnabled()) {
+ PersistentRedisCache redisCache = new PersistentRedisCache(configuration.redisCacheHost(), configuration.redisCachePort(), configuration.redisCacheExpireSeconds(), configuration.redisSocketTimeout(), configuration.redisConnectionTimeout(),
+ configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections(), configuration.redisDBIndex(), redisCacheIOMonitor);
+ persistentDiskCache.linkWith(redisCache);
+ closer.register(redisCache);
+
+ CacheStatsMBean redisCacheStatsMBean = redisCache.getCacheStats();
+ registerCloseable(registerMBean(CacheStatsMBean.class, redisCacheStatsMBean, CacheStats.TYPE, redisCacheStatsMBean.getName()));
+ }
+
+ return persistentDiskCache;
+ } else if (configuration.redisCacheEnabled()) {
+ PersistentRedisCache redisCache = new PersistentRedisCache(configuration.redisCacheHost(), configuration.redisCachePort(), configuration.redisCacheExpireSeconds(), configuration.redisSocketTimeout(), configuration.redisConnectionTimeout(),
+ configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections(), configuration.redisDBIndex(), redisCacheIOMonitor);
+ closer.register(redisCache);
+
+ CacheStatsMBean redisCacheStatsMBean = redisCache.getCacheStats();
+ registerCloseable(registerMBean(CacheStatsMBean.class, redisCacheStatsMBean, CacheStats.TYPE, redisCacheStatsMBean.getName()));
+
+ return redisCache;
+ }
+
+ return null;
+ }
+}
diff --git a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java
new file mode 100644
index 0000000000..c9945a9944
--- /dev/null
+++ b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.jackrabbit.oak.segment.remote.persistentcache;
+
+import org.apache.jackrabbit.oak.commons.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.*;
+
+public abstract class AbstractPersistentCacheTest {
+
+ protected static final int SEGMENTS = 1000;
+ protected static final int THREADS = 50;
+ protected static final int SEGMENTS_PER_THREAD = SEGMENTS / THREADS;
+ protected static final int TIMEOUT_COUNT = 50;
+
+ protected static final Executor executor = Executors.newFixedThreadPool(THREADS);
+
+ protected static final Consumer> runConcurrently = r -> {
+ for (int i = 0; i < THREADS; ++i) {
+ int finalI = i;
+ executor.execute(() -> {
+ for (int j = finalI * SEGMENTS_PER_THREAD; j < (finalI + 1) * SEGMENTS_PER_THREAD; ++j) {
+ r.accept(finalI, j);
+ }
+ });
+ }
+ };
+
+ protected AbstractPersistentCache persistentCache;
+
+ final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicInteger done = new AtomicInteger(0);
+ int count; // for checking timeouts
+
+ protected Consumer> waitWhile = (r -> {
+ for (count = 0; r.get() && count < TIMEOUT_COUNT; ++count) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ });
+
+ @Test
+ public void writeAndReadManySegments() throws Exception {
+ final List testSegments = new ArrayList<>(SEGMENTS);
+ final List