diff --git a/oak-run/pom.xml b/oak-run/pom.xml index c103845c03..ad38a9cd1f 100644 --- a/oak-run/pom.xml +++ b/oak-run/pom.xml @@ -35,6 +35,7 @@ 2.4.17 - 54000000 + 57671680 diff --git a/oak-segment-remote/pom.xml b/oak-segment-remote/pom.xml index a705e401c4..793de6b125 100644 --- a/oak-segment-remote/pom.xml +++ b/oak-segment-remote/pom.xml @@ -39,11 +39,18 @@ ${guava.osgi.import}, org.apache.jackrabbit.oak.segment.spi*, !org.apache.jackrabbit.oak.segment*, + !net.sf.cglib.asm.util, + !org.apache.tools.ant, + !org.apache.tools.ant.types, * - org.apache.jackrabbit.oak.segment.remote* + org.apache.jackrabbit.oak.segment.remote + + org.apache.servicemix.bundles.*, + commons-pool2 + @@ -108,6 +115,23 @@ ${project.version} provided + + + + org.apache.servicemix.bundles + org.apache.servicemix.bundles.jedis + 3.3.0_1 + + + org.apache.servicemix.bundles + org.apache.servicemix.bundles.cglib + 3.1_1 + + + org.apache.commons + commons-pool2 + 2.6.2 + @@ -120,5 +144,10 @@ junit test + + com.github.kstyrc + embedded-redis + 0.6 + diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java new file mode 100644 index 0000000000..03a5d757af --- /dev/null +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/Configuration.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.remote.persistentcache; + +import static org.apache.jackrabbit.oak.segment.remote.persistentcache.Configuration.PID; +import static org.apache.jackrabbit.oak.segment.remote.persistentcache.PersistentDiskCache.DEFAULT_MAX_CACHE_SIZE_MB; +import static org.apache.jackrabbit.oak.segment.remote.persistentcache.PersistentRedisCache.DEFAULT_REDIS_CACHE_EXPIRE_SECONDS; + +import org.osgi.service.metatype.annotations.AttributeDefinition; +import org.osgi.service.metatype.annotations.ObjectClassDefinition; + +@ObjectClassDefinition( + pid = {PID}, + name = "Apache Jackrabbit Oak Remote Persistent Cache Service", + description = "Persistent cache for the Oak Segment Node Store") +public @interface Configuration { + + String PID = "org.apache.jackrabbit.oak.segment.remote.RemotePersistentCacheService"; + + @AttributeDefinition( + name = "Disk cache persistence", + description = "Boolean value indicating that the local disk persisted cache should be used for segment store" + ) + boolean diskCacheEnabled() default false; + + @AttributeDefinition( + name = "Disk cache persistence directory", + description = "Path on the file system where disk cache persistence data will be stored." + ) + String diskCacheDirectory() default ""; + + @AttributeDefinition( + name = "Disk cache persistence maximum size", + description = "Disk cache size (in MB). Default value is " + DEFAULT_MAX_CACHE_SIZE_MB + ) + int diskCacheMaxSizeMB() default DEFAULT_MAX_CACHE_SIZE_MB; + + @AttributeDefinition( + name = "Redis cache persistence", + description = "Boolean value indicating that the redis persisted cache should be used for segment store" + ) + boolean redisCacheEnabled() default false; + + @AttributeDefinition( + name = "Redis cache persistence host", + description = "Remote redis server host" + ) + String redisCacheHost() default ""; + + @AttributeDefinition( + name = "Redis cache persistence port", + description = "Remote redis server port (0 = default)" + ) + int redisCachePort() default 0; + + @AttributeDefinition( + name = "Redis cache persistence entries expiry interval", + description = "Number of seconds to keep the entries in the cache. Default value is " + DEFAULT_REDIS_CACHE_EXPIRE_SECONDS + ) + int redisCacheExpireSeconds() default DEFAULT_REDIS_CACHE_EXPIRE_SECONDS; + + @AttributeDefinition( + name = "Redis cache db index", + description = "Redis cache db index (see Jedis#select(int))" + ) + int redisDBIndex() default 1; + + @AttributeDefinition( + name = "Redis socket timeout", + description = "Number of seconds to wait for response for request" + ) + int redisSocketTimeout() default 10; + + @AttributeDefinition( + name = "Redis connection timeout", + description = "Number of seconds to wait for redis connection to be established" + ) + int redisConnectionTimeout() default 50; + + @AttributeDefinition( + name = "Redis Minimum Connections", + description = "Minimum number of established connections that need to be kept in the pool" + ) + int redisMinConnections() default 10; + + @AttributeDefinition( + name = "Redis Maximum Connections", + description = "Maximum number of connections required by the business" + ) + int redisMaxConnections() default 100; + + @AttributeDefinition( + name = "Redis Maximum Total Connections", + description = "Includes the number of idle connections as a surplus" + ) + int redisMaxTotalConnections() default 200; +} diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java new file mode 100644 index 0000000000..43a86814e0 --- /dev/null +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/DiskCacheIOMonitor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.jackrabbit.oak.segment.remote.persistentcache; + +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.stats.MeterStats; +import org.apache.jackrabbit.oak.stats.StatisticsProvider; +import org.apache.jackrabbit.oak.stats.StatsOptions; +import org.apache.jackrabbit.oak.stats.TimerStats; +import org.jetbrains.annotations.NotNull; + +import java.io.File; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * This {@code IOMonitor} implementations registers the following monitoring endpoints + * with the Metrics library if available: + * + */ +public class DiskCacheIOMonitor extends IOMonitorAdapter { + public static final String OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_BYTES = "oak.segment.cache.disk.segment-read-bytes"; + public static final String OAK_SEGMENT_CACHE_DISK_SEGMENT_WRITE_BYTES = "oak.segment.cache.disk.segment-write-bytes"; + public static final String OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_TIME = "oak.segment.cache.disk.segment-read-time"; + public static final String OAK_SEGMENT_CACHE_DISk_SEGMENT_WRITE_TIME = "oak.segment.cache.disk.segment-write-time"; + + private final MeterStats segmentReadBytes; + private final MeterStats segmentWriteBytes; + private final TimerStats segmentReadTime; + private final TimerStats segmentWriteTime; + + public DiskCacheIOMonitor(@NotNull StatisticsProvider statisticsProvider) { + segmentReadBytes = statisticsProvider.getMeter( + OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_BYTES, StatsOptions.METRICS_ONLY); + segmentWriteBytes = statisticsProvider.getMeter( + OAK_SEGMENT_CACHE_DISK_SEGMENT_WRITE_BYTES, StatsOptions.METRICS_ONLY); + segmentReadTime = statisticsProvider.getTimer( + OAK_SEGMENT_CACHE_DISK_SEGMENT_READ_TIME, StatsOptions.METRICS_ONLY); + segmentWriteTime = statisticsProvider.getTimer( + OAK_SEGMENT_CACHE_DISk_SEGMENT_WRITE_TIME, StatsOptions.METRICS_ONLY); + } + + @Override + public void afterSegmentRead(File file, long msb, long lsb, int length, long elapsed) { + segmentReadBytes.mark(length); + segmentReadTime.update(elapsed, NANOSECONDS); + } + + @Override + public void afterSegmentWrite(File file, long msb, long lsb, int length, long elapsed) { + segmentWriteBytes.mark(length); + segmentWriteTime.update(elapsed, NANOSECONDS); + } +} diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java new file mode 100644 index 0000000000..81c3ed5472 --- /dev/null +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCache.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.remote.persistentcache; + +import com.google.common.base.Stopwatch; +import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.SegmentCacheStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; +import java.nio.file.attribute.FileTime; +import java.util.Comparator; +import java.util.Spliterator; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.stream.Stream; + +public class PersistentDiskCache extends AbstractPersistentCache { + private static final Logger logger = LoggerFactory.getLogger(PersistentDiskCache.class); + public static final int DEFAULT_MAX_CACHE_SIZE_MB = 512; + public static final String NAME = "Segment Disk Cache"; + + private final File directory; + private final long maxCacheSizeBytes; + private final IOMonitor diskCacheIOMonitor; + + final AtomicBoolean cleanupInProgress = new AtomicBoolean(false); + + final AtomicLong evictionCount = new AtomicLong(); + + private static final Comparator sortedByAccessTime = (path1, path2) -> { + try { + FileTime lastAccessFile1 = Files.readAttributes(path1, BasicFileAttributes.class).lastAccessTime(); + FileTime lastAccessFile2 = Files.readAttributes(path2, BasicFileAttributes.class).lastAccessTime(); + return lastAccessFile1.compareTo(lastAccessFile2); + } catch (IOException e) { + logger.error("A problem occurred while cleaning up the cache: ", e); + } + return 0; + }; + + public PersistentDiskCache(File directory, int cacheMaxSizeMB, IOMonitor diskCacheIOMonitor) { + this.directory = directory; + this.maxCacheSizeBytes = cacheMaxSizeMB * 1024L * 1024L; + this.diskCacheIOMonitor = diskCacheIOMonitor; + if (!directory.exists()) { + directory.mkdirs(); + } + + segmentCacheStats = new SegmentCacheStats( + NAME, + () -> maxCacheSizeBytes, + () -> Long.valueOf(directory.listFiles().length), + () -> FileUtils.sizeOfDirectory(directory), + () -> evictionCount.get()); + } + + @Override + protected Buffer readSegmentInternal(long msb, long lsb) { + try { + String segmentId = new UUID(msb, lsb).toString(); + File segmentFile = new File(directory, segmentId); + + Stopwatch stopwatch = Stopwatch.createStarted(); + if (segmentFile.exists()) { + diskCacheIOMonitor.beforeSegmentRead(segmentFile, msb, lsb, (int) segmentFile.length()); + try (FileInputStream fis = new FileInputStream(segmentFile); FileChannel channel = fis.getChannel()) { + int length = (int) channel.size(); + + Buffer buffer = Buffer.allocateDirect(length); + if (buffer.readFully(channel, 0) < length) { + throw new EOFException(); + } + + long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + diskCacheIOMonitor.afterSegmentRead(segmentFile, msb, lsb, (int) segmentFile.length(), elapsed); + + buffer.flip(); + + return buffer; + } catch (FileNotFoundException e) { + logger.info("Segment {} deleted from file system!", segmentId); + } catch (IOException e) { + logger.error("Error loading segment {} from cache:", segmentId, e); + } + } + } catch (Exception e) { + logger.error("Exception while reading segment {} from the cache:", new UUID(msb, lsb), e); + } + + return null; + } + + @Override + public boolean containsSegment(long msb, long lsb) { + return new File(directory, new UUID(msb, lsb).toString()).exists(); + } + + @Override + public void writeSegment(long msb, long lsb, Buffer buffer) { + String segmentId = new UUID(msb, lsb).toString(); + File segmentFile = new File(directory, segmentId); + Buffer bufferCopy = buffer.duplicate(); + + Runnable task = () -> { + if (lockSegmentWrite(segmentId)) { + try (FileChannel channel = new FileOutputStream(segmentFile).getChannel()) { + int fileSize = bufferCopy.write(channel); + cacheSize.addAndGet(fileSize); + } catch (Throwable t) { + logger.error("Error writing segment {} to cache: {}", segmentId, t); + } finally { + unlockSegmentWrite(segmentId); + } + } + cleanUp(); + }; + + executor.execute(task); + + if (nextCache != null) { + nextCache.writeSegment(msb, lsb, buffer); + } + } + + private boolean isCacheFull() { + return cacheSize.get() >= maxCacheSizeBytes; + } + + @Override + public void cleanUp() { + if (!cleanupInProgress.getAndSet(true)) { + try { + cleanUpInternal(); + } finally { + cleanupInProgress.set(false); + } + } + } + + private void cleanUpInternal() { + if (isCacheFull()) { + try { + Stream segmentsPaths = Files.walk(directory.toPath()) + .sorted(sortedByAccessTime) + .filter(filePath -> !filePath.toFile().isDirectory()); + + StreamConsumer.forEach(segmentsPaths, (path, breaker) -> { + + if (cacheSize.get() > maxCacheSizeBytes * 0.66) { + cacheSize.addAndGet(-path.toFile().length()); + path.toFile().delete(); + evictionCount.incrementAndGet(); + } else { + breaker.stop(); + } + }); + } catch (IOException e) { + logger.error("A problem occurred while cleaning up the cache: ", e); + } + } + } + + static class StreamConsumer { + + public static class Breaker { + private boolean shouldBreak = false; + + public void stop() { + shouldBreak = true; + } + + boolean get() { + return shouldBreak; + } + } + + public static void forEach(Stream stream, BiConsumer consumer) { + Spliterator spliterator = stream.spliterator(); + boolean hadNext = true; + Breaker breaker = new Breaker(); + + while (hadNext && !breaker.get()) { + hadNext = spliterator.tryAdvance(elem -> { + consumer.accept(elem, breaker); + }); + } + } + } +} diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java new file mode 100644 index 0000000000..706f3d7c6a --- /dev/null +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCache.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.remote.persistentcache; + +import com.google.common.base.Stopwatch; +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.SegmentCacheStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPoolConfig; +import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.params.SetParams; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.StringReader; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +public class PersistentRedisCache extends AbstractPersistentCache { + private static final Logger logger = LoggerFactory.getLogger(PersistentRedisCache.class); + public static final int DEFAULT_REDIS_CACHE_EXPIRE_SECONDS = 3600 * 24 * 2; + public static final String NAME = "Segment Redis Cache"; + + private static final String REDIS_PREFIX = "SEGMENT"; + private final IOMonitor redisCacheIOMonitor; + private JedisPool redisPool; + private SetParams setParamsWithExpire; + + public PersistentRedisCache(String redisHost, int redisPort, int redisExpireSeconds, int redisSocketTimeout, + int redisConnectionTimeout, int redisMinConnections, int redisMaxConnections, int redisMaxTotalConnections, + int redisDBIndex, IOMonitor redisCacheIOMonitor) { + this.redisCacheIOMonitor = redisCacheIOMonitor; + int redisExpireSeconds1 = redisExpireSeconds < 0 ? DEFAULT_REDIS_CACHE_EXPIRE_SECONDS : redisExpireSeconds; + setParamsWithExpire = SetParams.setParams().ex(redisExpireSeconds1); + + if (redisPort == 0) { + redisPort = 6379; + } + + JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); + jedisPoolConfig.setTestOnBorrow(true); + jedisPoolConfig.setMaxWaitMillis(redisSocketTimeout); + jedisPoolConfig.setMinIdle(redisMinConnections); + jedisPoolConfig.setMaxIdle(redisMaxConnections); + jedisPoolConfig.setMaxTotal(redisMaxTotalConnections); + + this.redisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, redisConnectionTimeout, + redisSocketTimeout, null, redisDBIndex, null); + this.segmentCacheStats = new SegmentCacheStats(NAME, this::getRedisMaxMemory, this::getCacheElementCount, + this::getCurrentWeight, this::getNumberOfEvictedKeys); + } + + private long getCacheElementCount() { + try(Jedis redis = redisPool.getResource()) { + return redis.dbSize(); + } catch (JedisException e) { + logger.error("Error getting number of elements in redis", e); + } + + return -1; + } + + private long getRedisMaxMemory() { + try{ + return Long.parseLong(getRedisProperty("memory", "maxmemory")); + } catch (JedisException | IOException e) { + logger.error("Error getting redis configuration value for 'maxmemory'", e); + } + return -1; + } + + private long getCurrentWeight() { + try{ + return Long.parseLong(getRedisProperty("memory", "used_memory")); + } catch (JedisException | IOException e) { + logger.error("Error getting number of elements in redis", e); + } + return -1; + } + + private long getNumberOfEvictedKeys() { + try{ + return Long.parseLong(getRedisProperty("stats", "evicted_keys")); + } catch (JedisException | IOException e) { + logger.error("Error getting number of evicted elements in redis", e); + } + return -1; + } + + private String getRedisProperty(String section, String propertyName) throws IOException { + try(Jedis redis = redisPool.getResource()) { + String redisInfoString = redis.info(section); + Properties props = new Properties(); + props.load(new StringReader(redisInfoString)); + return (String) props.get(propertyName); + } + } + + @Override + protected Buffer readSegmentInternal(long msb, long lsb) { + String segmentId = new UUID(msb, lsb).toString(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + try(Jedis redis = redisPool.getResource()) { + redisCacheIOMonitor.beforeSegmentRead(null, msb, lsb, 0); + + final byte[] bytes = redis.get((REDIS_PREFIX + ":" + segmentId).getBytes()); + if (bytes != null) { + long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS); + redisCacheIOMonitor.afterSegmentRead(null, msb, lsb, bytes.length, elapsed); + + Buffer buffer = Buffer.allocateDirect(bytes.length); + + buffer.put(bytes); + buffer.flip(); + return buffer; + } + } catch (Exception e) { + logger.error("Error loading segment {} from cache", segmentId, e); + } + + return null; + } + + @Override + public boolean containsSegment(long msb, long lsb) { + String segmentId = new UUID(msb, lsb).toString(); + + try (Jedis redis = redisPool.getResource()) { + return redis.exists((REDIS_PREFIX + ":" + segmentId).getBytes()); + } catch (JedisException e) { + logger.error("Error checking segment existence {} in cache: {}", segmentId, e); + } + + return false; + } + + @Override + public void writeSegment(long msb, long lsb, Buffer buffer) { + String segmentId = new UUID(msb, lsb).toString(); + Buffer bufferCopy = buffer.duplicate(); + + Runnable task = () -> { + if (lockSegmentWrite(segmentId)) { + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try (WritableByteChannel channel = Channels.newChannel(bos); Jedis redis = redisPool.getResource()) { + while (bufferCopy.hasRemaining()) { + bufferCopy.write(channel); + } + final byte[] key = (REDIS_PREFIX + ":" + segmentId).getBytes(); + redis.set(key, bos.toByteArray(), setParamsWithExpire); + cacheSize.addAndGet(bos.size()); + } catch (Throwable t) { + logger.error("Error writing segment {} to cache: {}", segmentId, t); + } finally { + unlockSegmentWrite(segmentId); + } + } + }; + + executor.execute(task); + + if (nextCache != null) { + nextCache.writeSegment(msb, lsb, buffer); + } + } + + @Override + public void cleanUp() { + // do nothing + } +} diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java new file mode 100644 index 0000000000..7e139f230a --- /dev/null +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RedisCacheIOMonitor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.jackrabbit.oak.segment.remote.persistentcache; + +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.stats.MeterStats; +import org.apache.jackrabbit.oak.stats.StatisticsProvider; +import org.apache.jackrabbit.oak.stats.StatsOptions; +import org.apache.jackrabbit.oak.stats.TimerStats; +import org.jetbrains.annotations.NotNull; + +import java.io.File; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +/** + * This {@code IOMonitor} implementations registers the following monitoring endpoints + * with the Metrics library if available: + *
    + *
  • {@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_BYTES}: + * a meter metrics for the number of bytes read from segment redis cache
  • + *
  • {@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_BYTES}: + * a meter metrics for the number of bytes written to segment redis cache
  • + *
  • {@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_TIME}: + * a timer metrics for the time spent reading from segment redis cache
  • + *
  • {@link #OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_TIME}: + * a timer metrics for the time spent writing to segment redis cache
  • + *
+ */ +public class RedisCacheIOMonitor extends IOMonitorAdapter { + public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_BYTES = "oak.segment.cache.redis.segment-read-bytes"; + public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_BYTES = "oak.segment.cache.redis.segment-write-bytes"; + public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_TIME = "oak.segment.cache.redis.segment-read-time"; + public static final String OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_TIME = "oak.segment.cache.redis.segment-write-time"; + + private final MeterStats segmentReadBytes; + private final MeterStats segmentWriteBytes; + private final TimerStats segmentReadTime; + private final TimerStats segmentWriteTime; + + public RedisCacheIOMonitor(@NotNull StatisticsProvider statisticsProvider) { + segmentReadBytes = statisticsProvider.getMeter( + OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_BYTES, StatsOptions.METRICS_ONLY); + segmentWriteBytes = statisticsProvider.getMeter( + OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_BYTES, StatsOptions.METRICS_ONLY); + segmentReadTime = statisticsProvider.getTimer( + OAK_SEGMENT_CACHE_REDIS_SEGMENT_READ_TIME, StatsOptions.METRICS_ONLY); + segmentWriteTime = statisticsProvider.getTimer( + OAK_SEGMENT_CACHE_REDIS_SEGMENT_WRITE_TIME, StatsOptions.METRICS_ONLY); + } + + @Override + public void afterSegmentRead(File file, long msb, long lsb, int length, long elapsed) { + segmentReadBytes.mark(length); + segmentReadTime.update(elapsed, NANOSECONDS); + } + + @Override + public void afterSegmentWrite(File file, long msb, long lsb, int length, long elapsed) { + segmentWriteBytes.mark(length); + segmentWriteTime.update(elapsed, NANOSECONDS); + } +} diff --git a/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java new file mode 100644 index 0000000000..4fd074a9d3 --- /dev/null +++ b/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/RemotePersistentCacheService.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.remote.persistentcache; + +import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly; + +import com.google.common.io.Closer; + +import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; +import org.apache.jackrabbit.oak.cache.CacheStats; +import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; +import org.apache.jackrabbit.oak.segment.spi.monitor.RoleStatisticsProvider; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; +import org.apache.jackrabbit.oak.spi.whiteboard.Registration; +import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils; +import org.apache.jackrabbit.oak.stats.StatisticsProvider; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.component.ComponentContext; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.ConfigurationPolicy; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; + +@Component( + configurationPolicy = ConfigurationPolicy.REQUIRE, + configurationPid = {Configuration.PID}) +public class RemotePersistentCacheService { + private ServiceRegistration registration; + + private PersistentCache persistentCache; + + private final Closer closer = Closer.create(); + + private OsgiWhiteboard osgiWhiteboard; + + @Reference + private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP; + + @Activate + public void activate(ComponentContext context, Configuration config) throws IOException { + osgiWhiteboard = new OsgiWhiteboard(context.getBundleContext()); + persistentCache = createPersistentCache(config, closer); + registration = context.getBundleContext().registerService(PersistentCache.class.getName(), persistentCache, new Properties()); + } + + @Deactivate + public void deactivate() throws IOException { + if (registration != null) { + registration.unregister(); + registration = null; + } + closeQuietly(closer); + persistentCache = null; + } + + protected void registerCloseable(final Registration registration) { + closer.register(registration::unregister); + } + + protected Registration registerMBean(Class clazz, T bean, String type, String name) { + return WhiteboardUtils.registerMBean(osgiWhiteboard, clazz, bean, type, name); + } + + private PersistentCache createPersistentCache(Configuration configuration, Closer closer) { + + RoleStatisticsProvider roleStatisticsProvider = new RoleStatisticsProvider(statisticsProvider, "remote_persistence"); + + DiskCacheIOMonitor diskCacheIOMonitor = new DiskCacheIOMonitor(roleStatisticsProvider); + RedisCacheIOMonitor redisCacheIOMonitor = new RedisCacheIOMonitor(roleStatisticsProvider); + + if (configuration.diskCacheEnabled()) { + PersistentDiskCache persistentDiskCache = new PersistentDiskCache(new File(configuration.diskCacheDirectory()), configuration.diskCacheMaxSizeMB(), diskCacheIOMonitor); + closer.register(persistentDiskCache); + + CacheStatsMBean diskCacheStatsMBean = persistentDiskCache.getCacheStats(); + registerCloseable(registerMBean(CacheStatsMBean.class, diskCacheStatsMBean, CacheStats.TYPE, diskCacheStatsMBean.getName())); + + if (configuration.redisCacheEnabled()) { + PersistentRedisCache redisCache = new PersistentRedisCache(configuration.redisCacheHost(), configuration.redisCachePort(), configuration.redisCacheExpireSeconds(), configuration.redisSocketTimeout(), configuration.redisConnectionTimeout(), + configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections(), configuration.redisDBIndex(), redisCacheIOMonitor); + persistentDiskCache.linkWith(redisCache); + closer.register(redisCache); + + CacheStatsMBean redisCacheStatsMBean = redisCache.getCacheStats(); + registerCloseable(registerMBean(CacheStatsMBean.class, redisCacheStatsMBean, CacheStats.TYPE, redisCacheStatsMBean.getName())); + } + + return persistentDiskCache; + } else if (configuration.redisCacheEnabled()) { + PersistentRedisCache redisCache = new PersistentRedisCache(configuration.redisCacheHost(), configuration.redisCachePort(), configuration.redisCacheExpireSeconds(), configuration.redisSocketTimeout(), configuration.redisConnectionTimeout(), + configuration.redisMinConnections(), configuration.redisMaxConnections(), configuration.redisMaxTotalConnections(), configuration.redisDBIndex(), redisCacheIOMonitor); + closer.register(redisCache); + + CacheStatsMBean redisCacheStatsMBean = redisCache.getCacheStats(); + registerCloseable(registerMBean(CacheStatsMBean.class, redisCacheStatsMBean, CacheStats.TYPE, redisCacheStatsMBean.getName())); + + return redisCache; + } + + return null; + } +} diff --git a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java new file mode 100644 index 0000000000..c9945a9944 --- /dev/null +++ b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/AbstractPersistentCacheTest.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.remote.persistentcache; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.AbstractPersistentCache; +import org.junit.Test; + +import java.util.*; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.junit.Assert.*; + +public abstract class AbstractPersistentCacheTest { + + protected static final int SEGMENTS = 1000; + protected static final int THREADS = 50; + protected static final int SEGMENTS_PER_THREAD = SEGMENTS / THREADS; + protected static final int TIMEOUT_COUNT = 50; + + protected static final Executor executor = Executors.newFixedThreadPool(THREADS); + + protected static final Consumer> runConcurrently = r -> { + for (int i = 0; i < THREADS; ++i) { + int finalI = i; + executor.execute(() -> { + for (int j = finalI * SEGMENTS_PER_THREAD; j < (finalI + 1) * SEGMENTS_PER_THREAD; ++j) { + r.accept(finalI, j); + } + }); + } + }; + + protected AbstractPersistentCache persistentCache; + + final AtomicInteger errors = new AtomicInteger(0); + final AtomicInteger done = new AtomicInteger(0); + int count; // for checking timeouts + + protected Consumer> waitWhile = (r -> { + for (count = 0; r.get() && count < TIMEOUT_COUNT; ++count) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + } + }); + + @Test + public void writeAndReadManySegments() throws Exception { + final List testSegments = new ArrayList<>(SEGMENTS); + final List> segmentsRead = new ArrayList<>(THREADS); + + for (int i = 0; i < SEGMENTS; ++i) { + testSegments.add(TestSegment.createSegment()); + } + + for (int i = 0; i < THREADS; ++i) { + final Map segmentsReadThisThread = new HashMap<>(SEGMENTS_PER_THREAD); + segmentsRead.add(segmentsReadThisThread); + } + + runConcurrently.accept((nThread, nSegment) -> { + TestSegment segment = testSegments.get(nSegment); + long[] id = segment.getSegmentId(); + try { + persistentCache.writeSegment(id[0], id[1], segment.getSegmentBuffer()); + } catch (Throwable t) { + errors.incrementAndGet(); + } finally { + done.incrementAndGet(); + } + }); + + waitWhile.accept(() -> done.get() < SEGMENTS); + waitWhile.accept(() -> persistentCache.getWritesPending() > 0); + + assertEquals("Errors have occurred while writing", 0, errors.get()); + assertNoTimeout(); + + done.set(0); + runConcurrently.accept((nThread, nSegment) -> { + final Map segmentsReadThisThread = segmentsRead.get(nThread); + final TestSegment segment = testSegments.get(nSegment); + final long[] id = segment.getSegmentId(); + try { + final Buffer segmentRead = persistentCache.readSegment(id[0], id[1], () -> null); + segmentsReadThisThread.put(new UUID(id[0], id[1]).toString(), segmentRead); + } finally { + done.incrementAndGet(); + } + }); + + waitWhile.accept(() -> done.get() < SEGMENTS); + + assertNoTimeout(); + assertEquals("Errors have occurred while reading", 0, errors.get()); + + for (int i = 0; i < THREADS; ++i) { + for (int j = i * SEGMENTS_PER_THREAD; j < (i + 1) * SEGMENTS_PER_THREAD; ++j) { + TestSegment testSegment = testSegments.get(j); + Map segmentsReadThisThread = segmentsRead.get(i); + long[] segmentReadId = testSegment.getSegmentId(); + Buffer segmentRead = segmentsReadThisThread.get(new UUID(segmentReadId[0], segmentReadId[1]).toString()); + if (segmentRead == null) { + errors.incrementAndGet(); + continue; + } + assertSegmentBufferEquals(testSegment.getSegmentBuffer(), segmentRead); + } + } + assertEquals("Segment(s) not found in cache", 0, errors.get()); + } + + @Test + public void testNonExisting() throws Exception { + final Random random = new Random(); + final long[] segmentIds = random.longs(2 * SEGMENTS).toArray(); + final AtomicInteger containsFailures = new AtomicInteger(0); + final AtomicInteger readFailures = new AtomicInteger(0); + + runConcurrently.accept((nThread, nSegment) -> { + try { + long msb = segmentIds[2 * nSegment]; + long lsb = segmentIds[2 * nSegment + 1]; + if (persistentCache.containsSegment(msb, lsb)) { + containsFailures.incrementAndGet(); + } + if (persistentCache.readSegment(msb, lsb, () -> null) != null) { + readFailures.incrementAndGet(); + } + } catch (Throwable t) { + errors.incrementAndGet(); + } finally { + done.incrementAndGet(); + } + }); + + waitWhile.accept(() -> done.get() < SEGMENTS); + + assertEquals("exceptions occurred", 0, errors.get()); + assertNoTimeout(); + assertEquals("containsSegment failed", 0, containsFailures.get()); + assertEquals("readSegment failed", 0, readFailures.get()); + } + + @Test + public void testExisting() throws Exception { + final TestSegment testSegment = TestSegment.createSegment(); + final long[] segmentId = testSegment.getSegmentId(); + persistentCache.writeSegment(segmentId[0], segmentId[1], testSegment.getSegmentBuffer()); + final AtomicInteger containsFailures = new AtomicInteger(0); + final AtomicInteger readFailures = new AtomicInteger(0); + + // We need this to give the cache's write thread pool time to start the thread + Thread.sleep(1000); + + waitWhile.accept(() -> persistentCache.getWritesPending() > 0); + assertNoTimeout(); + assertEquals(0, persistentCache.getWritesPending()); + + runConcurrently.accept((nThread, nSegment) -> { + try { + if (!persistentCache.containsSegment(segmentId[0], segmentId[1])) { + containsFailures.incrementAndGet(); + } + if (persistentCache.readSegment(segmentId[0], segmentId[1], () -> null) == null) { + readFailures.incrementAndGet(); + } + } catch (Throwable t) { + errors.incrementAndGet(); + } finally { + done.incrementAndGet(); + } + }); + + waitWhile.accept(() -> done.get() < SEGMENTS); + + assertEquals("Exceptions occurred", 0, errors.get()); + assertNoTimeout(); + assertEquals("containsSegment failed", 0, containsFailures.get()); + assertEquals("readSegment failed", 0, readFailures.get()); + } + + @Test + public void testConcurrentWritesSameSegment() throws Exception { + final TestSegment testSegment = TestSegment.createSegment(); + long[] segmentId = testSegment.getSegmentId(); + + runConcurrently.accept((nThread, nSegment) -> { + try { + persistentCache.writeSegment(segmentId[0], segmentId[1], testSegment.getSegmentBuffer()); + } catch (Throwable t) { + errors.incrementAndGet(); + } finally { + done.incrementAndGet(); + } + }); + + waitWhile.accept(() -> done.get() < SEGMENTS); + + Buffer segmentRead = persistentCache.readSegment(segmentId[0], segmentId[1], () -> null); + assertNotNull("The segment was not found", segmentRead); + assertSegmentBufferEquals(testSegment.getSegmentBuffer(), segmentRead); + } + + protected static class TestSegment { + public static int UUID_LEN = 2 * Long.SIZE; + public static int SEGMENT_LEN = 256 * 1024; + + private static final Random random = new Random(); + + private final byte[] segmentId; + private final byte[] segmentBytes; + + protected static TestSegment createSegment() { + return new TestSegment(createSegmentIdBytes(), createSegmentBytes()); + } + + private static byte[] createSegmentBytes() { + byte[] ret = new byte[SEGMENT_LEN]; + random.nextBytes(ret); + return ret; + } + + private static byte[] createSegmentIdBytes() { + byte[] ret = new byte[UUID_LEN]; + random.nextBytes(ret); + return ret; + } + + protected long[] getSegmentId() { + final Buffer buffer = Buffer.allocate(segmentId.length); + buffer.put(segmentId); + long[] ret = new long[2]; + ret[0] = buffer.getLong(0); + ret[1] = buffer.getLong(8); + return ret; + } + + protected Buffer getSegmentBuffer() { + return Buffer.wrap(segmentBytes); + } + + private TestSegment(byte[] segmentId, byte[] segmentBytes) { + this.segmentId = segmentId; + this.segmentBytes = segmentBytes; + } + + protected byte[] getSegmentBytes() { + return segmentBytes; + } + } + + protected static void assertSegmentBufferEquals(Buffer expected, Buffer actual) { + expected.rewind(); + actual.rewind(); + assertEquals("Segment size is different", TestSegment.SEGMENT_LEN, actual.remaining()); + for (int i = 0; i < TestSegment.SEGMENT_LEN; ++i) { + assertEquals("Difference in byte buffer", expected.get(i), actual.get(i)); + } + } + + protected void assertNoTimeout() { + assertTrue("Wait timeout reached", count < TIMEOUT_COUNT); + } +} diff --git a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java new file mode 100644 index 0000000000..7ffb743fb2 --- /dev/null +++ b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentDiskCacheTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.remote.persistentcache; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class PersistentDiskCacheTest extends AbstractPersistentCacheTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target")); + + @Before + public void setUp() throws Exception { + persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 10 * 1024, new IOMonitorAdapter()); + } + + @Test + public void cleanupTest() throws Exception { + persistentCache.close(); + persistentCache = new PersistentDiskCache(temporaryFolder.newFolder(), 0, new IOMonitorAdapter()); + final List testSegments = new ArrayList<>(SEGMENTS); + final List> segmentsRead = new ArrayList<>(THREADS); + + for (int i = 0; i < SEGMENTS; ++i) { + testSegments.add(TestSegment.createSegment()); + } + + for (int i = 0; i < THREADS; ++i) { + final Map segmentsReadThisThread = new HashMap<>(SEGMENTS_PER_THREAD); + segmentsRead.add(segmentsReadThisThread); + } + + runConcurrently.accept((nThread, nSegment) -> { + TestSegment segment = testSegments.get(nSegment); + long[] id = segment.getSegmentId(); + try { + persistentCache.writeSegment(id[0], id[1], segment.getSegmentBuffer()); + } catch (Throwable t) { + errors.incrementAndGet(); + } finally { + done.incrementAndGet(); + } + }); + + waitWhile.accept(() -> done.get() < SEGMENTS); + waitWhile.accept(() -> persistentCache.getWritesPending() > 0); + + assertEquals("Errors have occurred while writing", 0, errors.get()); + assertNoTimeout(); + + done.set(0); + waitWhile.accept(() -> ((PersistentDiskCache) persistentCache).cleanupInProgress.get()); + + persistentCache.cleanUp(); + + runConcurrently.accept((nThread, nSegment) -> { + final TestSegment segment = testSegments.get(nSegment); + final long[] id = segment.getSegmentId(); + try { + final Map segmentsReadThisThread = segmentsRead.get(nThread); + final Buffer segmentRead = persistentCache.readSegment(id[0], id[1], () -> null); + segmentsReadThisThread.put(new UUID(id[0], id[1]).toString(), segmentRead); + } catch (Throwable t) { + errors.incrementAndGet(); + } finally { + done.incrementAndGet(); + } + }); + + waitWhile.accept(() -> done.get() < SEGMENTS); + + assertNoTimeout(); + assertEquals("Errors have occurred while reading", 0, errors.get()); + errors.set(0); + + for (int i = 0; i < THREADS; ++i) { + for (int j = i * SEGMENTS_PER_THREAD; j < (i + 1) * SEGMENTS_PER_THREAD; ++j) { + TestSegment testSegment = testSegments.get(j); + byte[] testSegmentBytes = testSegment.getSegmentBytes(); + Map segmentsReadThisThread = segmentsRead.get(i); + long[] segmentReadId = testSegment.getSegmentId(); + Buffer segmentRead = segmentsReadThisThread.get(new UUID(segmentReadId[0], segmentReadId[1]).toString()); + if (segmentRead == null) { + errors.incrementAndGet(); + } + } + } + assertEquals("Segment(s) not cleaned up in cache", 0, SEGMENTS - errors.get()); + } + + @Test + public void testIOMonitor() throws IOException { + IOMonitorAdapter ioMonitorAdapter = Mockito.mock(IOMonitorAdapter.class); + + persistentCache.close(); + File cacheFolder = temporaryFolder.newFolder(); + persistentCache = new PersistentDiskCache(cacheFolder, 0, ioMonitorAdapter); + + UUID segmentUUID = UUID.randomUUID(); + + persistentCache.readSegment(segmentUUID.getMostSignificantBits(), segmentUUID.getLeastSignificantBits(), () -> null); + + //Segment not in cache, monitor methods not invoked + verify(ioMonitorAdapter, never()).beforeSegmentRead(any(), anyLong(), anyLong(), anyInt()); + verify(ioMonitorAdapter, never()).afterSegmentRead(any(), anyLong(), anyLong(), anyInt(), anyLong()); + + //place segment in disk cache + File segmentFile = new File(cacheFolder, segmentUUID.toString()); + segmentFile.createNewFile(); + + persistentCache.readSegment(segmentUUID.getMostSignificantBits(), segmentUUID.getLeastSignificantBits(), () -> null); + + verify(ioMonitorAdapter, times(1)).beforeSegmentRead(eq(segmentFile), eq(segmentUUID.getMostSignificantBits()), eq(segmentUUID.getLeastSignificantBits()), anyInt()); + verify(ioMonitorAdapter, times(1)).afterSegmentRead(eq(segmentFile), eq(segmentUUID.getMostSignificantBits()), eq(segmentUUID.getLeastSignificantBits()), anyInt(), anyLong()); + } +} \ No newline at end of file diff --git a/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java new file mode 100644 index 0000000000..74a706d817 --- /dev/null +++ b/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/persistentcache/PersistentRedisCacheTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.remote.persistentcache; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import redis.embedded.RedisServer; + +import java.io.IOException; +import java.util.UUID; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class PersistentRedisCacheTest extends AbstractPersistentCacheTest { + + private RedisServer redisServer; + private IOMonitorAdapter ioMonitorAdapter; + + @Before + public void setUp() throws Exception { + redisServer = RedisServer.builder().build(); + redisServer.start(); + int port = redisServer.ports().get(0); + ioMonitorAdapter = Mockito.mock(IOMonitorAdapter.class); + + persistentCache = new PersistentRedisCache( + "localhost", + port, + -1, + 10000, + 50, + 10, + 2000, + 200000, + 0, + ioMonitorAdapter + ); + } + + @After + public void tearDown() throws Exception { + redisServer.stop(); + } + + @Test + public void testIOMonitor() throws IOException, InterruptedException { + + UUID segmentUUID = UUID.randomUUID(); + long msb = segmentUUID.getMostSignificantBits(); + long lsb = segmentUUID.getLeastSignificantBits(); + + persistentCache.readSegment(msb, lsb, () -> null); + + //Segment not in cache, monitor methods not invoked + verify(ioMonitorAdapter, never()).afterSegmentRead(any(), anyLong(), anyLong(), anyInt(), anyLong()); + + persistentCache.writeSegment(msb, lsb, Buffer.wrap("segment_content".getBytes())); + + Thread.sleep(300); + + persistentCache.readSegment(msb, lsb, () -> null); + + verify(ioMonitorAdapter, times(1)).afterSegmentRead(any(), eq(msb), eq(lsb), anyInt(), anyLong()); + } + +} \ No newline at end of file diff --git a/oak-segment-tar/pom.xml b/oak-segment-tar/pom.xml index 4c6835ab42..b889be8551 100644 --- a/oak-segment-tar/pom.xml +++ b/oak-segment-tar/pom.xml @@ -48,7 +48,8 @@ org.apache.jackrabbit.oak.segment.spi, org.apache.jackrabbit.oak.segment.spi.monitor, org.apache.jackrabbit.oak.segment.spi.persistence, - org.apache.jackrabbit.oak.segment.spi.persistence.split + org.apache.jackrabbit.oak.segment.spi.persistence.split, + org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache netty-*, diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java index 1c083d36d8..a572b1b385 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java @@ -47,7 +47,9 @@ import org.apache.jackrabbit.api.stats.RepositoryStatistics; import org.apache.jackrabbit.api.stats.TimeSeries; import org.apache.jackrabbit.oak.osgi.OsgiUtil; import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; +import org.apache.jackrabbit.oak.segment.spi.monitor.RoleStatisticsProvider; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.state.NodeStoreProvider; import org.apache.jackrabbit.oak.spi.whiteboard.Registration; @@ -253,6 +255,12 @@ public class SegmentNodeStoreFactory { ) boolean splitPersistence() default false; + @AttributeDefinition( + name = "Cache persistence", + description = "Boolean value indicating that the persisted cache should be used for the custom segment store" + ) + boolean cachePersistence() default false; + @AttributeDefinition( name = "Backup directory", description = "Directory (relative to current working directory) for storing repository backups. " + @@ -315,6 +323,13 @@ public class SegmentNodeStoreFactory { ) private volatile SegmentNodeStorePersistence segmentStore; + @Reference( + cardinality = ReferenceCardinality.OPTIONAL, + policy = ReferencePolicy.STATIC, + policyOption = ReferencePolicyOption.GREEDY + ) + private volatile PersistentCache persistentCache; + @Reference private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP; @@ -334,6 +349,7 @@ public class SegmentNodeStoreFactory { configuration, blobStore, segmentStore, + persistentCache, getRoleStatisticsProvider(statisticsProvider, configuration.role()), registrations, whiteboard, @@ -376,6 +392,7 @@ public class SegmentNodeStoreFactory { Configuration configuration, BlobStore blobStore, SegmentNodeStorePersistence segmentStore, + PersistentCache persistentCache, StatisticsProvider statisticsProvider, Closer closer, Whiteboard whiteboard, @@ -538,6 +555,11 @@ public class SegmentNodeStoreFactory { return configuration.splitPersistence(); } + @Override + public boolean hasCachePersistence() { + return configuration.cachePersistence(); + } + @Override public boolean registerDescriptors() { return configuration.registerDescriptors(); @@ -606,6 +628,11 @@ public class SegmentNodeStoreFactory { return segmentStore; } + @Override + public PersistentCache getPersistentCache() { + return persistentCache; + } + @Override public BundleContext getBundleContext() { return context.getBundleContext(); @@ -615,51 +642,6 @@ public class SegmentNodeStoreFactory { } private static StatisticsProvider getRoleStatisticsProvider(StatisticsProvider delegate, String role) { - RepositoryStatistics repositoryStatistics = new RepositoryStatistics() { - - @Override - public TimeSeries getTimeSeries(Type type) { - return getTimeSeries(type.name(), type.isResetValueEachSecond()); - } - - @Override - public TimeSeries getTimeSeries(String type, boolean resetValueEachSecond) { - return delegate.getStats().getTimeSeries(addRoleToName(type, role), resetValueEachSecond); - } - }; - - return new StatisticsProvider() { - - @Override - public RepositoryStatistics getStats() { - return repositoryStatistics; - } - - @Override - public MeterStats getMeter(String name, StatsOptions options) { - return delegate.getMeter(addRoleToName(name, role), options); - } - - @Override - public CounterStats getCounterStats(String name, StatsOptions options) { - return delegate.getCounterStats(addRoleToName(name, role), options); - } - - @Override - public TimerStats getTimer(String name, StatsOptions options) { - return delegate.getTimer(addRoleToName(name, role), options); - } - - @Override - public HistogramStats getHistogram(String name, StatsOptions options) { - return delegate.getHistogram(addRoleToName(name, role), options); - } - - }; + return new RoleStatisticsProvider(delegate, role); } - - private static String addRoleToName(String name, String role) { - return role + '.' + name; - } - } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java index 69fb62a90a..38be06ecd4 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreRegistrar.java @@ -25,14 +25,8 @@ import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.RETA import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder; import static org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo.getOrCreateId; -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - import com.google.common.io.Closer; + import org.apache.jackrabbit.commons.SimpleValueFactory; import org.apache.jackrabbit.oak.api.Descriptors; import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean; @@ -50,9 +44,17 @@ import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; import org.apache.jackrabbit.oak.segment.compaction.SegmentRevisionGC; import org.apache.jackrabbit.oak.segment.compaction.SegmentRevisionGCMBean; -import org.apache.jackrabbit.oak.segment.file.*; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.file.FileStoreGCMonitor; +import org.apache.jackrabbit.oak.segment.file.FileStoreStatsMBean; +import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.segment.file.MetricsIOMonitor; +import org.apache.jackrabbit.oak.segment.file.MetricsRemoteStoreMonitor; import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.CachingPersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; import org.apache.jackrabbit.oak.segment.spi.persistence.split.SplitPersistence; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; @@ -76,6 +78,13 @@ import org.osgi.framework.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + class SegmentNodeStoreRegistrar { static SegmentNodeStore registerSegmentNodeStore(Configuration cfg) throws IOException { @@ -136,6 +145,8 @@ class SegmentNodeStoreRegistrar { boolean hasSplitPersistence(); + boolean hasCachePersistence(); + boolean registerDescriptors(); boolean dispatchChanges(); @@ -160,6 +171,8 @@ class SegmentNodeStoreRegistrar { SegmentNodeStorePersistence getSegmentNodeStorePersistence(); + PersistentCache getPersistentCache(); + BundleContext getBundleContext(); } @@ -231,16 +244,22 @@ class SegmentNodeStoreRegistrar { } if (cfg.hasCustomSegmentStore() && cfg.getSegmentNodeStorePersistence() != null) { + SegmentNodeStorePersistence customPersistence = cfg.getSegmentNodeStorePersistence(); + + if (cfg.hasCachePersistence()) { + cfg.getLogger().info("Using persistent cache for the custom persistence [{}]", customPersistence); + customPersistence = new CachingPersistence(cfg.getPersistentCache(), customPersistence); + } + if (cfg.hasSplitPersistence()) { - cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}] and local writes", cfg.getSegmentNodeStorePersistence()); + cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}] and local writes", customPersistence); cfg.getSplitPersistenceDirectory().mkdirs(); - SegmentNodeStorePersistence roPersistence = cfg.getSegmentNodeStorePersistence(); SegmentNodeStorePersistence rwPersistence = new TarPersistence(cfg.getSplitPersistenceDirectory()); - SegmentNodeStorePersistence persistence = new SplitPersistence(roPersistence, rwPersistence); + SegmentNodeStorePersistence persistence = new SplitPersistence(customPersistence, rwPersistence); builder.withCustomPersistence(persistence); } else { - cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}]", cfg.getSegmentNodeStorePersistence()); - builder.withCustomPersistence(cfg.getSegmentNodeStorePersistence()); + cfg.getLogger().info("Initializing SegmentNodeStore with custom persistence [{}]", customPersistence); + builder.withCustomPersistence(customPersistence); } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java index d883482e64..dd7b1298f3 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java @@ -38,13 +38,12 @@ import static org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions.SIZE import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.DEFAULT_MAX_FILE_SIZE; import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY_STANDALONE_TARGET; -import java.io.File; -import java.io.IOException; - import com.google.common.io.Closer; + import org.apache.jackrabbit.oak.osgi.OsgiUtil; import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache.PersistentCache; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; import org.apache.jackrabbit.oak.stats.StatisticsProvider; @@ -64,6 +63,9 @@ import org.osgi.service.metatype.annotations.ObjectClassDefinition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; + /** * An OSGi wrapper for the segment node store. */ @@ -251,6 +253,12 @@ public class SegmentNodeStoreService { ) boolean splitPersistence() default false; + @AttributeDefinition( + name = "Cache persistence", + description = "Boolean value indicating that the persisted cache should be used for the custom segment store" + ) + boolean cachePersistence() default false; + @AttributeDefinition( name = "Backup directory", description = "Directory (relative to current working directory) for storing repository backups. " + @@ -295,6 +303,13 @@ public class SegmentNodeStoreService { ) private volatile SegmentNodeStorePersistence segmentStore; + @Reference( + cardinality = ReferenceCardinality.OPTIONAL, + policy = ReferencePolicy.STATIC, + policyOption = ReferencePolicyOption.GREEDY + ) + private volatile PersistentCache persistentCache; + @Reference private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP; @@ -303,7 +318,7 @@ public class SegmentNodeStoreService { @Activate public void activate(ComponentContext context, Configuration configuration) throws IOException { OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext()); - registerSegmentStore(context, configuration, blobStore, segmentStore, statisticsProvider, closer, whiteboard, log); + registerSegmentStore(context, configuration, blobStore, segmentStore, persistentCache, statisticsProvider, closer, whiteboard, log); } private static SegmentNodeStore registerSegmentStore( @@ -311,6 +326,7 @@ public class SegmentNodeStoreService { Configuration configuration, BlobStore blobStore, SegmentNodeStorePersistence segmentStore, + PersistentCache persistentCache, StatisticsProvider statisticsProvider, Closer closer, Whiteboard whiteboard, @@ -488,6 +504,11 @@ public class SegmentNodeStoreService { return configuration.splitPersistence(); } + @Override + public boolean hasCachePersistence() { + return configuration.cachePersistence(); + } + @Override public boolean registerDescriptors() { return true; @@ -556,6 +577,11 @@ public class SegmentNodeStoreService { return segmentStore; } + @Override + public PersistentCache getPersistentCache() { + return persistentCache; + } + @Override public BundleContext getBundleContext() { return context.getBundleContext(); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java new file mode 100644 index 0000000000..2b1c2c94e7 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/monitor/RoleStatisticsProvider.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.spi.monitor; + +import org.apache.jackrabbit.api.stats.RepositoryStatistics; +import org.apache.jackrabbit.api.stats.TimeSeries; +import org.apache.jackrabbit.oak.stats.CounterStats; +import org.apache.jackrabbit.oak.stats.HistogramStats; +import org.apache.jackrabbit.oak.stats.MeterStats; +import org.apache.jackrabbit.oak.stats.StatisticsProvider; +import org.apache.jackrabbit.oak.stats.StatsOptions; +import org.apache.jackrabbit.oak.stats.TimerStats; + +public class RoleStatisticsProvider implements StatisticsProvider{ + + private final StatisticsProvider delegate; + private final String role; + private final RepositoryStatistics repositoryStatistics; + + public RoleStatisticsProvider(StatisticsProvider delegate, String role) { + this.delegate = delegate; + this.role = role; + + this.repositoryStatistics = new RepositoryStatistics() { + + @Override + public TimeSeries getTimeSeries(Type type) { + return getTimeSeries(type.name(), type.isResetValueEachSecond()); + } + + @Override + public TimeSeries getTimeSeries(String type, boolean resetValueEachSecond) { + return delegate.getStats().getTimeSeries(addRoleToName(type, role), resetValueEachSecond); + } + }; + } + + @Override + public RepositoryStatistics getStats() { + return repositoryStatistics; + } + + @Override + public MeterStats getMeter(String name, StatsOptions options) { + return delegate.getMeter(addRoleToName(name, role), options); + } + + @Override + public CounterStats getCounterStats(String name, StatsOptions options) { + return delegate.getCounterStats(addRoleToName(name, role), options); + } + + @Override + public TimerStats getTimer(String name, StatsOptions options) { + return delegate.getTimer(addRoleToName(name, role), options); + } + + @Override + public HistogramStats getHistogram(String name, StatsOptions options) { + return delegate.getHistogram(addRoleToName(name, role), options); + } + + private static String addRoleToName(String name, String role) { + return role + '.' + name; + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java new file mode 100644 index 0000000000..4c467f9c25 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/AbstractPersistentCache.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; + +import static java.lang.Thread.currentThread; +import static java.util.concurrent.TimeUnit.SECONDS; + +import com.google.common.base.Stopwatch; + +import org.apache.jackrabbit.oak.cache.AbstractCacheStats; +import org.apache.jackrabbit.oak.commons.Buffer; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.HashSet; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +public abstract class AbstractPersistentCache implements PersistentCache, Closeable { + private static final Logger logger = LoggerFactory.getLogger(AbstractPersistentCache.class); + + public static final int THREADS = Integer.getInteger("oak.segment.cache.threads", 10); + + protected ExecutorService executor; + protected AtomicLong cacheSize = new AtomicLong(0); + protected PersistentCache nextCache; + protected final HashSet writesPending; + + protected SegmentCacheStats segmentCacheStats; + + public AbstractPersistentCache() { + executor = Executors.newFixedThreadPool(THREADS); + writesPending = new HashSet<>(); + } + + public PersistentCache linkWith(AbstractPersistentCache nextCache) { + this.nextCache = nextCache; + return nextCache; + } + + @Override + public Buffer readSegment(long msb, long lsb, @NotNull Callable loader) { + Buffer segment = readSegmentInternal(msb, lsb); + if (segment != null) { + segmentCacheStats.hitCount.incrementAndGet(); + return segment; + } + segmentCacheStats.missCount.incrementAndGet(); + + // Either use the next cache or the 'loader' + Callable nextLoader = nextCache != null + ? () -> nextCache.readSegment(msb, lsb, loader) + : loader; + + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + segment = nextLoader.call(); + + if (segment != null) { + recordCacheLoadTimeInternal(stopwatch.elapsed(TimeUnit.NANOSECONDS), true); + writeSegment(msb, lsb, segment); + } + + return segment; + } catch (Exception t) { + logger.error("Exception while loading segment {} from remote store or linked cache", new UUID(msb, lsb), t); + recordCacheLoadTimeInternal(stopwatch.elapsed(TimeUnit.NANOSECONDS), false); + } + return segment; + } + + /** + * Reads the segment from the cache. + * If segment is not found, this method does not query next cache that was set with {@link #linkWith(AbstractPersistentCache)} + * + * @param msb the most significant bits of the identifier of the segment + * @param lsb the least significant bits of the identifier of the segment + * @return byte buffer containing the segment data or null if the segment doesn't exist + */ + protected abstract Buffer readSegmentInternal(long msb, long lsb); + + /** + * Records time spent to load data from external source, after cache miss. + * + * @param loadTime load time in nanoseconds + * @param successful indicates whether loading of the segment into cache was successful + */ + protected final void recordCacheLoadTimeInternal(long loadTime, boolean successful) { + if (successful) { + segmentCacheStats.loadSuccessCount.incrementAndGet(); + } else { + segmentCacheStats.loadExceptionCount.incrementAndGet(); + } + segmentCacheStats.loadTime.addAndGet(loadTime); + } + + /** + * @return Statistics for this cache. + */ + @NotNull + public AbstractCacheStats getCacheStats() { + return segmentCacheStats; + } + + @Override + public void close() { + try { + executor.shutdown(); + if (executor.awaitTermination(60, SECONDS)) { + logger.debug("The persistent cache scheduler was successfully shut down"); + } else { + logger.warn("The persistent cache scheduler takes too long to shut down"); + } + } catch (InterruptedException e) { + logger.warn("Interrupt while shutting down the persistent cache scheduler", e); + currentThread().interrupt(); + } + } + + public int getWritesPending() { + synchronized (writesPending) { + return writesPending.size(); + } + } + + protected boolean lockSegmentWrite(String segmentId) { + synchronized (writesPending) { + return writesPending.add(segmentId); + } + } + + protected void unlockSegmentWrite(String segmentId) { + synchronized (writesPending) { + writesPending.remove(segmentId); + } + } +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java new file mode 100644 index 0000000000..ae8023361e --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingArchiveManager.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; + +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +public class CachingArchiveManager implements SegmentArchiveManager { + + private final SegmentArchiveManager delegate; + + private final PersistentCache persistentCache; + + public CachingArchiveManager(PersistentCache persistentCache, SegmentArchiveManager delegate) { + this.delegate = delegate; + this.persistentCache = persistentCache; + } + + @Override + public @NotNull List listArchives() throws IOException { + return delegate.listArchives(); + } + + @Override + public @Nullable SegmentArchiveReader open(@NotNull String archiveName) throws IOException { + return new CachingSegmentArchiveReader(persistentCache, delegate.open(archiveName)); + } + + @Override + public @Nullable SegmentArchiveReader forceOpen(String archiveName) throws IOException { + return new CachingSegmentArchiveReader(persistentCache, delegate.forceOpen(archiveName)); + } + + @Override + public @NotNull SegmentArchiveWriter create(@NotNull String archiveName) throws IOException { + return delegate.create(archiveName); + } + + @Override + public boolean delete(@NotNull String archiveName) { + return delegate.delete(archiveName); + } + + @Override + public boolean renameTo(@NotNull String from, @NotNull String to) { + return delegate.renameTo(from, to); + } + + @Override + public void copyFile(@NotNull String from, @NotNull String to) throws IOException { + delegate.copyFile(from, to); + } + + @Override + public boolean exists(@NotNull String archiveName) { + return delegate.exists(archiveName); + } + + @Override + public void recoverEntries(@NotNull String archiveName, @NotNull LinkedHashMap entries) throws IOException { + delegate.recoverEntries(archiveName, entries); + } + + @Override + public void backup(@NotNull String archiveName, @NotNull String backupArchiveName, + @NotNull Set recoveredEntries) throws IOException { + delegate.backup(archiveName, backupArchiveName, recoveredEntries); + } +} \ No newline at end of file diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java new file mode 100644 index 0000000000..44a174f0f4 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistence.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; + +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor; +import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitor; +import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile; +import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; + +import java.io.IOException; + +public class CachingPersistence implements SegmentNodeStorePersistence { + + private final SegmentNodeStorePersistence delegate; + + private final PersistentCache persistentCache; + + public CachingPersistence(PersistentCache persistentCache, SegmentNodeStorePersistence delegate) { + this.delegate = delegate; + this.persistentCache = persistentCache; + } + + @Override + public SegmentArchiveManager createArchiveManager(boolean memoryMapping, boolean offHeapAccess, IOMonitor ioMonitor, + FileStoreMonitor fileStoreMonitor, RemoteStoreMonitor remoteStoreMonitor) throws IOException { + return new CachingArchiveManager(persistentCache, delegate.createArchiveManager(memoryMapping, offHeapAccess, ioMonitor, fileStoreMonitor, remoteStoreMonitor)); + } + + @Override + public boolean segmentFilesExist() { + return delegate.segmentFilesExist(); + } + + @Override + public JournalFile getJournalFile() { + return delegate.getJournalFile(); + } + + @Override + public GCJournalFile getGCJournalFile() throws IOException { + return delegate.getGCJournalFile(); + } + + @Override + public ManifestFile getManifestFile() throws IOException { + return delegate.getManifestFile(); + } + + @Override + public RepositoryLock lockRepository() throws IOException { + return delegate.lockRepository(); + } + +} \ No newline at end of file diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java new file mode 100644 index 0000000000..07e33c9020 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.util.List; + +public class CachingSegmentArchiveReader implements SegmentArchiveReader { + + @NotNull + private final PersistentCache persistentCache; + + @NotNull + private final SegmentArchiveReader delegate; + + public CachingSegmentArchiveReader( + @NotNull PersistentCache persistentCache, + @NotNull SegmentArchiveReader delegate) { + this.persistentCache = persistentCache; + this.delegate = delegate; + } + + @Override + @Nullable + public Buffer readSegment(long msb, long lsb) throws IOException { + return persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); + } + + @Override + public boolean containsSegment(long msb, long lsb) { + if (persistentCache.containsSegment(msb, lsb)) { + return true; + } else { + return delegate.containsSegment(msb, lsb); + } + } + + @Override + public List listSegments() { + return delegate.listSegments(); + } + + @Override + @Nullable + public Buffer getGraph() throws IOException { + return delegate.getGraph(); + } + + @Override + public boolean hasGraph() { + return delegate.hasGraph(); + } + + @Override + @NotNull + public Buffer getBinaryReferences() throws IOException { + return delegate.getBinaryReferences(); + } + + @Override + public long length() { + return delegate.length(); + } + + @Override + @NotNull + public String getName() { + return delegate.getName(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public int getEntrySize(int size) { + return delegate.getEntrySize(size); + } +} \ No newline at end of file diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java new file mode 100644 index 0000000000..72fa0a8c8b --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCache.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.Callable; + +/** + * This interface represents a cache which survives segment store restarts. + * The cache is agnostic to any archive structure. Segments are only + * identified by their UUIDs, specified as msb and lsb parts of the segment id. + */ +public interface PersistentCache { + + /** + * Reads the segment from cache. + * + * @param msb the most significant bits of the identifier of the segment + * @param lsb the least significant bits of the identifier of the segment + * @param loader in case of cache miss, with {@code loader.call()} missing element will be retrieved + * @return byte buffer containing the segment data or null if the segment doesn't exist + */ + @Nullable + Buffer readSegment(long msb, long lsb, @NotNull Callable loader); + + /** + * Check if the segment exists in the cache. + * + * @param msb the most significant bits of the identifier of the segment + * @param lsb the least significant bits of the identifier of the segment + * @return {@code true} if the segment exists + */ + boolean containsSegment(long msb, long lsb); + + /** + * Writes the segment to the cache. + * + * @param msb the most significant bits of the identifier of the segment + * @param lsb the least significant bits of the identifier of the segment + * @param buffer the byte buffer containing the segment data + */ + void writeSegment(long msb, long lsb, Buffer buffer); + + /** + * Purges the cache entries according to the implementation policy (e.g. maximum + * cache size, maximum number of entries, etc.) + */ + void cleanUp(); +} diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java new file mode 100644 index 0000000000..c94409966d --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/SegmentCacheStats.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; + +import com.google.common.cache.CacheStats; +import org.apache.jackrabbit.oak.cache.AbstractCacheStats; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class SegmentCacheStats extends AbstractCacheStats { + private final @NotNull Supplier maximumWeight; + + @NotNull + private final Supplier elementCount; + + @NotNull + final Supplier currentWeight; + + @NotNull + final AtomicLong loadSuccessCount = new AtomicLong(); + + @NotNull + final AtomicInteger loadExceptionCount = new AtomicInteger(); + + @NotNull + final AtomicLong loadTime = new AtomicLong(); + + @NotNull + final Supplier evictionCount; + + @NotNull + final AtomicLong hitCount = new AtomicLong(); + + @NotNull + final AtomicLong missCount = new AtomicLong(); + + public SegmentCacheStats(@NotNull String name, + @NotNull Supplier maximumWeight, + @NotNull Supplier elementCount, + @NotNull Supplier currentWeight, + @NotNull Supplier evictionCount) { + super(name); + this.maximumWeight = maximumWeight; + this.elementCount = checkNotNull(elementCount); + this.currentWeight = checkNotNull(currentWeight); + this.evictionCount = evictionCount; + } + + @Override + protected CacheStats getCurrentStats() { + return new CacheStats( + hitCount.get(), + missCount.get(), + loadSuccessCount.get(), + loadExceptionCount.get(), + loadTime.get(), + evictionCount.get() + ); + } + + @Override + public long getElementCount() { + return elementCount.get(); + } + + @Override + public long getMaxTotalWeight() { + return maximumWeight.get(); + } + + @Override + public long estimateCurrentWeight() { + return currentWeight.get(); + } +} + diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java new file mode 100644 index 0000000000..20973b5cd7 --- /dev/null +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@Internal(since = "1.0.0") +@Version("1.0.0") +package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; + +import org.apache.jackrabbit.oak.commons.annotations.Internal; +import org.osgi.annotation.versioning.Version; \ No newline at end of file diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java new file mode 100644 index 0000000000..df2013c26f --- /dev/null +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/PersistentCacheStatsTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; + +import org.apache.jackrabbit.oak.commons.Buffer; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +public class PersistentCacheStatsTest { + + @Test + public void testCacheStats() { + AbstractPersistentCache cache = new PersistentCacheImpl(); + + cache.writeSegment(1, 1, Buffer.wrap(new byte[]{1})); + + //segment in cache + Buffer segment = cache.readSegment(1, 1, () -> null); + assertNotNull(segment); + + assertEquals(1, cache.getCacheStats().getHitCount()); + long loadTime = cache.getCacheStats().getTotalLoadTime(); + assertEquals(0, loadTime); + + + //segment not in cache but loaded from remote store + segment = cache.readSegment(0, 0, () -> Buffer.wrap(new byte[]{0})); + assertNotNull(segment); + assertEquals(1, cache.getCacheStats().getMissCount()); + loadTime = cache.getCacheStats().getTotalLoadTime(); + assertTrue(loadTime > 0); + + //segment not in cache and exception while loading from remote store + segment = cache.readSegment(2, 2, () -> { + throw new Exception(); + }); + assertNull(segment); + long loadTime2 = cache.getCacheStats().getTotalLoadTime(); + assertTrue(loadTime2 > loadTime); + assertEquals(2, cache.getCacheStats().getMissCount()); + assertEquals(1, cache.getCacheStats().getLoadExceptionCount()); + + cache.close(); + } + + @Test + public void testCacheStatsForLinkedCaches() { + AbstractPersistentCache cache1 = new PersistentCacheImpl(); + AbstractPersistentCache cache2 = new PersistentCacheImpl(); + + cache1.linkWith(cache2); + + //segment not in either cache + Buffer segment = cache1.readSegment(0 ,0, () -> null); + + assertNull(segment); + assertEquals(1, cache1.getCacheStats().getMissCount()); + assertEquals(1, cache2.getCacheStats().getMissCount()); + assertEquals(0, cache1.getCacheStats().getHitCount()); + assertEquals(0, cache2.getCacheStats().getHitCount()); + assertEquals(0, cache1.segmentCacheStats.getLoadCount()); + assertEquals(0, cache2.segmentCacheStats.getLoadCount()); + assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount()); + assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount()); + + //segment in first cache + cache1.writeSegment(1, 1, Buffer.wrap(new byte[]{1})); + segment = cache1.readSegment(1 ,1, () -> null); + + assertNotNull(segment); + assertEquals(1, cache1.getCacheStats().getMissCount()); + assertEquals(1, cache2.getCacheStats().getMissCount()); + assertEquals(1, cache1.getCacheStats().getHitCount()); + assertEquals(0, cache2.getCacheStats().getHitCount()); + assertEquals(0, cache1.segmentCacheStats.getLoadCount()); + assertEquals(0, cache2.segmentCacheStats.getLoadCount()); + assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount()); + assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount()); + + //segment in second cache + cache2.writeSegment(2, 2, Buffer.wrap(new byte[]{2})); + segment = cache1.readSegment(2 ,2, () -> null); + + assertNotNull(segment); + assertEquals(2, cache1.getCacheStats().getMissCount()); + assertEquals(1, cache2.getCacheStats().getMissCount()); + assertEquals(1, cache1.getCacheStats().getHitCount()); + assertEquals(1, cache2.getCacheStats().getHitCount()); + assertEquals(1, cache1.segmentCacheStats.getLoadCount()); + assertEquals(0, cache2.segmentCacheStats.getLoadCount()); + assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount()); + assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount()); + + //segment loaded from the remote storage + segment = cache1.readSegment(3 ,3, () -> Buffer.wrap(new byte[]{3})); + + assertNotNull(segment); + assertEquals(3, cache1.getCacheStats().getMissCount()); + assertEquals(2, cache2.getCacheStats().getMissCount()); + assertEquals(1, cache1.getCacheStats().getHitCount()); + assertEquals(1, cache2.getCacheStats().getHitCount()); + assertEquals(2, cache1.segmentCacheStats.getLoadCount()); + assertEquals(1, cache2.segmentCacheStats.getLoadCount()); + assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount()); + assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount()); + + //exception while loading segment from the remote storage + segment = cache1.readSegment(4 ,4, () -> { + throw new Exception(); + }); + + assertNull(segment); + assertEquals(4, cache1.getCacheStats().getMissCount()); + assertEquals(3, cache2.getCacheStats().getMissCount()); + assertEquals(1, cache1.getCacheStats().getHitCount()); + assertEquals(1, cache2.getCacheStats().getHitCount()); + assertEquals(2, cache1.segmentCacheStats.getLoadCount()); + assertEquals(2, cache2.segmentCacheStats.getLoadCount()); + assertEquals(0, cache1.segmentCacheStats.getLoadExceptionCount()); + assertEquals(1, cache2.segmentCacheStats.getLoadExceptionCount()); + + //linked cache throws exception + cache2 = new PersistentCacheImpl(){ + @Override + protected Buffer readSegmentInternal(long msb, long lsb) { + throw new RuntimeException(); + } + }; + cache1.linkWith(cache2); + segment = cache1.readSegment(5 ,5, () -> Buffer.wrap(new byte[]{5})); + + assertNull(segment); + assertEquals(5, cache1.getCacheStats().getMissCount()); + assertEquals(0, cache2.getCacheStats().getMissCount()); + assertEquals(1, cache1.getCacheStats().getHitCount()); + assertEquals(0, cache2.getCacheStats().getHitCount()); + assertEquals(3, cache1.segmentCacheStats.getLoadCount()); + assertEquals(0, cache2.segmentCacheStats.getLoadCount()); + assertEquals(1, cache1.segmentCacheStats.getLoadExceptionCount()); + assertEquals(0, cache2.segmentCacheStats.getLoadExceptionCount()); + + cache2.close(); + cache1.close(); + } + + class PersistentCacheImpl extends AbstractPersistentCache { + HashMap segments = new HashMap<>(); + + public PersistentCacheImpl() { + segmentCacheStats = new SegmentCacheStats("stats", () -> maximumWeight, () -> elementCount.get(), () -> currentWeight.get(), () -> evictionCount.get()); + } + + long maximumWeight = Long.MAX_VALUE; + AtomicLong elementCount = new AtomicLong(); + AtomicLong currentWeight = new AtomicLong(); + AtomicLong evictionCount = new AtomicLong(); + + void AbstractPersistentCache() { + segmentCacheStats = new SegmentCacheStats("stats", () -> maximumWeight, () -> elementCount.get(), () -> currentWeight.get(), () -> evictionCount.get()); + } + + @Override + public boolean containsSegment(long msb, long lsb) { + return segments.containsKey(new UUID(msb, lsb)); + } + + @Override + public void writeSegment(long msb, long lsb, Buffer buffer) { + segments.put(new UUID(msb, lsb), buffer); + } + + @Override + public void cleanUp() { + + } + + @Override + protected Buffer readSegmentInternal(long msb, long lsb) { + return segments.get(new UUID(msb, lsb)); + } + } +}