diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java index 812e2a972d..62ea6a8620 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java @@ -71,6 +71,8 @@ import org.apache.jackrabbit.oak.plugins.blob.CachingBlobStore; import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeState.Children; import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache; +import org.apache.jackrabbit.oak.plugins.document.cache.prefetch.LoggingPrefetchAlgorithm; +import org.apache.jackrabbit.oak.plugins.document.cache.prefetch.PrefetchAlgorithm; import org.apache.jackrabbit.oak.plugins.document.locks.NodeDocumentLocks; import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; import org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobReferenceIterator; @@ -584,6 +586,7 @@ public class DocumentMK { private long maxReplicationLagMillis = TimeUnit.HOURS.toMillis(6); private boolean disableBranches; private boolean prefetchExternalChanges; + private boolean docStoreCachePrefetching; private Clock clock = Clock.SIMPLE; private Executor executor; private String persistentCacheURI = DEFAULT_PERSISTENT_CACHE_URI; @@ -1173,6 +1176,19 @@ public class DocumentMK { return gcMonitor; } + public Builder setDocStoreCachePrefetching(boolean docStoreCachePrefetching) { + this.docStoreCachePrefetching = docStoreCachePrefetching; + return this; + } + + public boolean getDocStoreCachePrefetching() { + return docStoreCachePrefetching; + } + + public PrefetchAlgorithm getPrefetchingAlgorithm() { + return new LoggingPrefetchAlgorithm(); + } + VersionGCSupport createVersionGCSupport() { DocumentStore store = getDocumentStore(); if (store instanceof MongoDocumentStore) { @@ -1381,7 +1397,6 @@ public class DocumentMK { blobStoreCacheStats = ((CachingBlobStore) blobStore).getCacheStats(); } } - } } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java index 076e0d0e66..8dd1a0e538 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java @@ -536,6 +536,9 @@ public final class DocumentNodeStore } else { readOnlyMode = false; } + if (builder.getDocStoreCachePrefetching()) { + s = new PrefetchingDocumentStoreWrapper(s, builder.getPrefetchingAlgorithm()); + } checkVersion(s, readOnlyMode); this.executor = builder.getExecutor(); this.clock = builder.getClock(); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java index a5d2548285..1f2e826aad 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java @@ -288,6 +288,12 @@ public class DocumentNodeStoreService { ) public static final String PROP_ROLE = "role"; + @Property(boolValue = false, + label = "Use docStore cache prefetching", + description = "Property indicating that the DocumentStore cache prefetching should be enabled" + ) + public static final String PROP_CACHE_PREFETCHING = "cachePrefetching"; + private static enum DocumentStoreType { MONGO, RDB; @@ -482,6 +488,7 @@ public class DocumentNodeStoreService { int cacheStackMoveDistance = toInteger(prop(PROP_CACHE_STACK_MOVE_DISTANCE), DEFAULT_CACHE_STACK_MOVE_DISTANCE); boolean bundlingDisabled = toBoolean(prop(PROP_BUNDLING_DISABLED), DEFAULT_BUNDLING_DISABLED); boolean prefetchExternalChanges = toBoolean(prop(PROP_PREFETCH_EXTERNAL_CHANGES), false); + boolean docStoreCachePrefetching = toBoolean(prop(PROP_CACHE_PREFETCHING), false); int updateLimit = toInteger(prop(PROP_UPDATE_LIMIT), DocumentMK.UPDATE_LIMIT); long journalGCMaxAge = toLong(context.getProperties().get(PROP_JOURNAL_GC_MAX_AGE_MILLIS), DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS); DocumentMK.Builder mkBuilder = @@ -521,7 +528,8 @@ public class DocumentNodeStoreService { }). setPrefetchExternalChanges(prefetchExternalChanges). setUpdateLimit(updateLimit). - setJournalGCMaxAge(journalGCMaxAge); + setJournalGCMaxAge(journalGCMaxAge). + setDocStoreCachePrefetching(docStoreCachePrefetching); if (!Strings.isNullOrEmpty(persistentCache)) { mkBuilder.setPersistentCache(persistentCache); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PrefetchingDocumentStoreWrapper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PrefetchingDocumentStoreWrapper.java new file mode 100644 index 0000000000..6c1d5b5d56 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/PrefetchingDocumentStoreWrapper.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import org.apache.jackrabbit.oak.cache.CacheStats; +import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; +import org.apache.jackrabbit.oak.plugins.document.cache.prefetch.PrefetchAlgorithm; +import org.apache.jackrabbit.oak.plugins.document.cache.prefetch.PrefetchTimeSeries; +import org.apache.jackrabbit.oak.plugins.document.cache.prefetch.Request; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class PrefetchingDocumentStoreWrapper implements DocumentStore, RevisionListener { + + private static final Logger LOG = LoggerFactory.getLogger(PrefetchingDocumentStoreWrapper.class); + + private final Cache sessions; + + private final DocumentStore store; + + private final PrefetchAlgorithm prefetchAlgorithm; + + public PrefetchingDocumentStoreWrapper(DocumentStore store, PrefetchAlgorithm prefetchAlgorithm) { + this.store = store; + this.prefetchAlgorithm = prefetchAlgorithm; + this.sessions = CacheBuilder.newBuilder() + .expireAfterWrite(2, TimeUnit.SECONDS) + .removalListener((RemovalListener) n -> n.getValue().onClose()) + .build(); + } + + private void handleRequest(Request request) { + sessions.cleanUp(); + + String threadName = request.getThreadName(); + if (!threadName.endsWith("HTTP/1.1")) { + return; + } + PrefetchTimeSeries series = null; + try { + series = sessions.get(threadName, () -> prefetchAlgorithm.newSession()); + } catch (ExecutionException e) { + LOG.error("Can't create new session", e); + } + series.onRequest(request); + } + + @CheckForNull + @Override + public T find(Collection collection, String key) throws DocumentStoreException { + if (collection == Collection.NODES) { + handleRequest(Request.createFindRequest(Thread.currentThread().getName(), key)); + } + return store.find(collection, key); + } + + @CheckForNull + @Override + public T find(Collection collection, String key, int maxCacheAge) throws DocumentStoreException { + if (collection == Collection.NODES) { + handleRequest(Request.createFindRequest(Thread.currentThread().getName(), key)); + } + return store.find(collection, key, maxCacheAge); + } + + @Nonnull + @Override + public List query(Collection collection, String fromKey, String toKey, int limit) throws DocumentStoreException { + if (collection == Collection.NODES) { + handleRequest(Request.createQueryRequest(Thread.currentThread().getName(), fromKey, toKey)); + } + return store.query(collection, fromKey, toKey, limit); + } + + @Nonnull + @Override + public List query(Collection collection, String fromKey, String toKey, String indexedProperty, long startValue, int limit) throws DocumentStoreException { + if (collection == Collection.NODES) { + handleRequest(Request.createQueryRequest(Thread.currentThread().getName(), fromKey, toKey, indexedProperty, startValue)); + } + return store.query(collection, fromKey, toKey, indexedProperty, startValue, limit); + } + + @Override + public void remove(Collection collection, String key) throws DocumentStoreException { + store.remove(collection, key); + } + + @Override + public void remove(Collection collection, List keys) throws DocumentStoreException { + store.remove(collection, keys); + } + + @Override + public int remove(Collection collection, Map> toRemove) throws DocumentStoreException { + return store.remove(collection, toRemove); + } + + @Override + public int remove(Collection collection, String indexedProperty, long startValue, long endValue) throws DocumentStoreException { + return store.remove(collection, indexedProperty, startValue, endValue); + } + + @Override + public boolean create(Collection collection, List updateOps) throws IllegalArgumentException, DocumentStoreException { + return store.create(collection, updateOps); + } + + @Override + public void update(Collection collection, List keys, UpdateOp updateOp) throws IllegalArgumentException, DocumentStoreException { + store.update(collection, keys, updateOp); + } + + @CheckForNull + @Override + public T createOrUpdate(Collection collection, UpdateOp update) throws IllegalArgumentException, DocumentStoreException { + return store.createOrUpdate(collection, update); + } + + @Override + public List createOrUpdate(Collection collection, List updateOps) throws DocumentStoreException { + return store.createOrUpdate(collection, updateOps); + } + + @CheckForNull + @Override + public T findAndUpdate(Collection collection, UpdateOp update) throws DocumentStoreException { + return store.findAndUpdate(collection, update); + } + + @CheckForNull + @Override + public CacheInvalidationStats invalidateCache() { + return store.invalidateCache(); + } + + @CheckForNull + @Override + public CacheInvalidationStats invalidateCache(Iterable keys) { + return store.invalidateCache(keys); + } + + @Override + public void invalidateCache(Collection collection, String key) { + store.invalidateCache(collection, key); + } + + @Override + public void dispose() { + store.dispose(); + } + + @CheckForNull + @Override + public T getIfCached(Collection collection, String key) { + return store.getIfCached(collection, key); + } + + @Override + public void setReadWriteMode(String readWriteMode) { + store.setReadWriteMode(readWriteMode); + } + + @CheckForNull + @Override + public Iterable getCacheStats() { + return store.getCacheStats(); + } + + @Override + public Map getMetadata() { + return store.getMetadata(); + } + + @Override + public long determineServerTimeDifferenceMillis() throws UnsupportedOperationException, DocumentStoreException { + return store.determineServerTimeDifferenceMillis(); + } + + @Override + public void updateAccessedRevision(RevisionVector revision, int currentClusterId) { + if (store instanceof RevisionListener) { + ((RevisionListener) store).updateAccessedRevision(revision, currentClusterId); + } + } +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/prefetch/LoggingPrefetchAlgorithm.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/prefetch/LoggingPrefetchAlgorithm.java new file mode 100644 index 0000000000..d57c07f4e4 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/prefetch/LoggingPrefetchAlgorithm.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.cache.prefetch; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class LoggingPrefetchAlgorithm implements PrefetchAlgorithm { + + private static final Logger LOG = LoggerFactory.getLogger(LoggingPrefetchAlgorithm.class); + + @Override + public PrefetchTimeSeries newSession() { + return new PrefetchTimeSeries() { + + private final List requests = new ArrayList<>(); + + @Override + public void onRequest(Request request) { + requests.add(request); + } + + @Override + public List getCandidates() { + return Collections.emptyList(); + } + + @Override + public void onClose() { + if (!requests.isEmpty()) { + StringBuilder msg = new StringBuilder("Session summary:\n"); + msg.append("Session: ").append(requests.get(0).getThreadName()).append('\n'); + requests.forEach(r -> msg.append(r).append('\n')); + LOG.info(msg.toString()); + } + } + }; + } +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/prefetch/PrefetchAlgorithm.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/prefetch/PrefetchAlgorithm.java new file mode 100644 index 0000000000..e754ace865 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/prefetch/PrefetchAlgorithm.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.cache.prefetch; + +/** + * Implementation of the PrefetchAlgorithm analyses the outgoing document DB + * requests and tries to predict the future ones. + */ +public interface PrefetchAlgorithm { + + /** + * Create a new session. The DocumentStore implementation will use this object + * to report all the DB requests that belongs together (eg. they are bound + * to the same thread or HTTP request) and ask it for the prefetch candidates. + * + * @return A new prefetch session. + */ + PrefetchTimeSeries newSession(); + +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/prefetch/PrefetchTimeSeries.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/prefetch/PrefetchTimeSeries.java new file mode 100644 index 0000000000..871afce7a3 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/prefetch/PrefetchTimeSeries.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.cache.prefetch; + +import java.util.List; + +/** + * This class represents a series of related requests. Depending on the DocumentStore + * implementation, they may be done within a single HTTP request, client session, + * etc. The prefetch algorithm should assume that these requests belongs to the + * same cluster. + *

+ * + */ +public interface PrefetchTimeSeries { + + /** + * The onRequest() method is invoked every time the DocumentStore client + * requests a document. It doesn't matter whether the document is served + * from the cache or requested from the server. This allows the underlying + * algorithm to build the prediction model. + * + * @param request + */ + void onRequest(Request request); + + /** + * Predict the candidates for the subsequent requests. + * + * @return A list of potential candidates for the next request. + */ + List getCandidates(); + + /** + * This method is called when the session is closed. There won't be any + * more request calls related to this series. + */ + void onClose(); +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/prefetch/Request.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/prefetch/Request.java new file mode 100644 index 0000000000..a9195f814f --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/prefetch/Request.java @@ -0,0 +1,104 @@ +package org.apache.jackrabbit.oak.plugins.document.cache.prefetch; + +public class Request { + + /** + * The type of the request. + */ + enum RequestType { + /** + * Request for a single document. + */ + FIND, + + /** + * Request for a range of documents (typically a children of a node). + */ + QUERY, + + /** + * Request for a range of documents with an extra condition. + */ + QUERY_WITH_INDEXED_PROPERTY + } + + private final String threadName; + + private final RequestType type; + + private final String key; + + private final String keyFrom; + + private final String keyTo; + + private final String indexedProperty; + + private final long startValue; + + private Request(String threadName, RequestType type, String key, String keyFrom, String keyTo, String indexedProperty, long startValue) { + this.threadName = threadName; + this.type = type; + this.key = key; + this.keyFrom = keyFrom; + this.keyTo = keyTo; + this.indexedProperty = indexedProperty; + this.startValue = startValue; + } + + public static Request createFindRequest(String threadName, String key) { + return new Request(threadName, RequestType.FIND, key, null, null, null, 0); + } + + public static Request createQueryRequest(String threadName, String keyFrom, String keyTo) { + return new Request(threadName, RequestType.QUERY, null, keyFrom, keyTo, null, 0); + } + + public static Request createQueryRequest(String threadName, String keyFrom, String keyTo, String indexedProperty, long startValue) { + return new Request(threadName, RequestType.QUERY_WITH_INDEXED_PROPERTY, null, keyFrom, keyTo, indexedProperty, startValue); + } + + public RequestType getType() { + return type; + } + + public String getKey() { + return key; + } + + public String getKeyFrom() { + return keyFrom; + } + + public String getKeyTo() { + return keyTo; + } + + public String getIndexedProperty() { + return indexedProperty; + } + + public long getStartValue() { + return startValue; + } + + public String getThreadName() { + return threadName; + } + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + result.append("Request"); + if (type == RequestType.FIND) { + result.append('[').append(key).append(']'); + } + if (type == RequestType.QUERY || type == RequestType.QUERY_WITH_INDEXED_PROPERTY) { + result.append('[').append(keyFrom).append("]...[").append(keyTo).append(']'); + } + if (type == RequestType.QUERY_WITH_INDEXED_PROPERTY) { + result.append('[').append(indexedProperty).append(">=").append(startValue).append(']'); + } + return result.toString(); + } +}