From 46b48afe1170dd9f5542a4018075eb1754d7f8ec Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Wed, 1 Jul 2015 13:37:35 +0000 Subject: [PATCH] OAK-2829: Comparing node states for external changes is too slow OAK-3002: Optimize docCache and docChildrenCache invalidation by filtering using journal Merged revisions 1678023,1678171,1684820,1685590,1685964,1685977,1685989,1686023,1686032,1688179 from trunk Conflicts: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java - [conflicts: imports affected and minor manual merge required] oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java - [manual merge required for access modifier change of applyChanges()] oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java - [conflicts: only imports affected] oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java - [conflict with OAK-2074 not available in 1.0 branch] - [conflict with OAK-2131 not available in 1.0 branch] - [conflict with OAK-2324 not available in 1.0 branch] oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java - [conflict with OAK-2324 not available in 1.0 branch] - [conflict with OAK-1768] - [conflict with OAK-2888 only available in 1.0 branch] oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java - [conflict with OAK-1641 not available in 1.0 branch] - [conflict with OAK-2681 not available in 1.0 branch: findUncachedWithRetry not avail in 1.0] oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java - [conflicts: only imports affected] oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java - [conflicts: only imports affected] oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest.java - [conflict with OAK-2717 not available in 1.0 branch] - [ADDED JcrCreator class plus OakRepositoryFixture.setUpCluster(int n, JcrCreator customizer) from trunk] Additional Compile Problems: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java - [conflict with OAK-2888 only available in 1.0 branch, fixed, Snapshot.IGNORE added] Additional Test Problems: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java - [conflict with OAK-2131 not available in 1.0 branch, hence test logic had to be adapted/fixed] --- .../jackrabbit/oak/commons/sort/StringSort.java | 14 +- .../jackrabbit/oak/commons/sort/package-info.java | 2 +- .../oak/plugins/document/Collection.java | 14 + .../jackrabbit/oak/plugins/document/Commit.java | 27 +- .../jackrabbit/oak/plugins/document/DiffCache.java | 5 +- .../oak/plugins/document/DocumentNodeState.java | 7 +- .../oak/plugins/document/DocumentNodeStore.java | 190 ++++++-- .../plugins/document/DocumentNodeStoreService.java | 36 ++ .../oak/plugins/document/DocumentStore.java | 7 + .../oak/plugins/document/JournalEntry.java | 505 +++++++++++++++++++++ .../plugins/document/JournalGarbageCollector.java | 141 ++++++ .../oak/plugins/document/LastRevRecoveryAgent.java | 44 +- .../oak/plugins/document/LocalDiffCache.java | 3 +- .../oak/plugins/document/MemoryDiffCache.java | 3 +- .../oak/plugins/document/MergeCommit.java | 18 +- .../oak/plugins/document/TieredDiffCache.java | 15 +- .../oak/plugins/document/UnsavedModifications.java | 17 +- .../document/memory/MemoryDocumentStore.java | 16 +- .../plugins/document/mongo/CacheInvalidator.java | 4 +- .../oak/plugins/document/mongo/MongoDiffCache.java | 7 +- .../plugins/document/mongo/MongoDocumentStore.java | 118 +++-- .../oak/plugins/document/rdb/RDBDocumentStore.java | 12 +- .../document/util/LoggingDocumentStoreWrapper.java | 11 + .../util/SynchronizingDocumentStoreWrapper.java | 5 + .../document/util/TimingDocumentStoreWrapper.java | 12 + .../oak/plugins/document/util/Utils.java | 29 ++ .../oak/plugins/observation/NodeObserver.java | 2 +- .../oak/plugins/document/AbstractJournalTest.java | 217 +++++++++ .../oak/plugins/document/AmnesiaDiffCache.java | 2 +- .../oak/plugins/document/ClusterTest.java | 4 + .../plugins/document/CountingDocumentStore.java | 222 +++++++++ .../plugins/document/CountingTieredDiffCache.java | 65 +++ .../plugins/document/DocumentNodeStoreTest.java | 7 +- .../oak/plugins/document/JournalEntryTest.java | 149 ++++++ .../oak/plugins/document/JournalTest.java | 440 ++++++++++++++++++ .../plugins/document/UnsavedModificationsTest.java | 9 +- .../oak/plugins/document/mongo/JournalIT.java | 222 +++++++++ .../plugins/document/mongo/MongoDiffCacheTest.java | 2 +- .../oak/jcr/observation/ChangeProcessor.java | 7 + .../jackrabbit/oak/benchmark/ObservationTest.java | 185 ++++++-- .../apache/jackrabbit/oak/fixture/JcrCreator.java | 34 ++ .../oak/fixture/OakRepositoryFixture.java | 7 +- 42 files changed, 2686 insertions(+), 150 deletions(-) create mode 100644 oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java create mode 100644 oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java create mode 100644 oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java create mode 100644 oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java create mode 100644 oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java create mode 100644 oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java create mode 100644 oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java create mode 100644 oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java create mode 100644 oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java diff --git a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java index 6ccbfa6..c98dad3 100644 --- a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java +++ b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/StringSort.java @@ -35,6 +35,7 @@ import com.google.common.base.Charsets; import com.google.common.collect.Lists; import com.google.common.io.Closer; import com.google.common.io.Files; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; import org.slf4j.Logger; @@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory; * the list would be maintained in memory. If the size crosses the required threshold then * the sorting would be performed externally */ -public class StringSort implements Closeable { +public class StringSort implements Iterable, Closeable { private final Logger log = LoggerFactory.getLogger(getClass()); public static final int BATCH_SIZE = 2048; @@ -117,6 +118,17 @@ public class StringSort implements Closeable { } } + @Override + public Iterator iterator() { + try { + return getIds(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + //--------------------------< internal >------------------------------------ + private void addToBatch(String id) throws IOException { inMemBatch.add(id); if (inMemBatch.size() >= BATCH_SIZE) { diff --git a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java index 44a5956..ce74b99 100644 --- a/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java +++ b/oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/package-info.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@Version("1.0") +@Version("1.1") @Export(optional = "provide:=true") package org.apache.jackrabbit.oak.commons.sort; diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java index 14cbce3..9ee0359 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Collection.java @@ -70,6 +70,20 @@ public abstract class Collection { } }; + /** + * The 'journal' collection contains documents with consolidated + * diffs for changes performed by a cluster node between two background + * updates. + */ + public static final Collection JOURNAL = + new Collection("journal") { + @Nonnull + @Override + public JournalEntry newDocument(DocumentStore store) { + return new JournalEntry(store); + } + }; + private final String name; public Collection(String name) { diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java index c535883..7c5b395 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java @@ -37,6 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Collections.singletonList; +import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.COLLISIONS; import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.SPLIT_CANDIDATE_THRESHOLD; @@ -48,7 +50,7 @@ public class Commit { private static final Logger LOG = LoggerFactory.getLogger(Commit.class); - private final DocumentNodeStore nodeStore; + protected final DocumentNodeStore nodeStore; private final DocumentNodeStoreBranch branch; private final Revision baseRevision; private final Revision revision; @@ -124,6 +126,15 @@ public class Commit { return baseRevision; } + /** + * @return all modified paths, including ancestors without explicit + * modifications. + */ + @Nonnull + Iterable getModifiedPaths() { + return modifiedNodes; + } + void addNodeDiff(DocumentNodeState n) { diff.tag('+').key(n.getPath()); diff.object(); @@ -271,7 +282,7 @@ public class Commit { // so that all operations can be rolled back if there is a conflict ArrayList opLog = new ArrayList(); - //Compute the commit root + // Compute the commit root for (String p : operations.keySet()) { markChanged(p); if (commitRootPath == null) { @@ -285,6 +296,16 @@ public class Commit { } } } + + // push branch changes to journal + if (baseBranchRevision != null) { + // store as external change + JournalEntry doc = JOURNAL.newDocument(store); + doc.modified(modifiedNodes); + Revision r = revision.asBranchRevision(); + store.create(JOURNAL, singletonList(doc.asUpdateOp(r))); + } + int commitRootDepth = PathUtils.getDepth(commitRootPath); // check if there are real changes on the commit root boolean commitRootHasChanges = operations.containsKey(commitRootPath); @@ -583,7 +604,7 @@ public class Commit { } list.add(p); } - DiffCache.Entry cacheEntry = nodeStore.getDiffCache().newEntry(before, revision); + DiffCache.Entry cacheEntry = nodeStore.getDiffCache().newEntry(before, revision, true); List added = new ArrayList(); List removed = new ArrayList(); List changed = new ArrayList(); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java index 83e1f00..82c80e2 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DiffCache.java @@ -56,11 +56,14 @@ public interface DiffCache { * * @param from the from revision. * @param to the to revision. + * @param local true indicates that the entry results from a local change, + * false if it results from an external change * @return the cache entry. */ @Nonnull Entry newEntry(@Nonnull Revision from, - @Nonnull Revision to); + @Nonnull Revision to, + boolean local); /** * @return the statistics for this cache. diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java index 9a78dd3..518d97a 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeState.java @@ -396,10 +396,9 @@ public class DocumentNodeState extends AbstractNodeState implements CacheValue { @Override public String toString() { StringBuilder buff = new StringBuilder(); - buff.append("path: ").append(path).append('\n'); - buff.append("rev: ").append(rev).append('\n'); - buff.append(properties); - buff.append('\n'); + buff.append("{ path: '").append(path).append("', "); + buff.append("rev: '").append(rev).append("', "); + buff.append("properties: '").append(properties.values()).append("' }"); return buff.toString(); } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java index 4915ab4..eb9cd3e 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java @@ -21,14 +21,19 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.toArray; import static com.google.common.collect.Iterables.transform; +import static java.util.Collections.singletonList; import static org.apache.jackrabbit.oak.api.CommitFailedException.MERGE; import static org.apache.jackrabbit.oak.commons.PathUtils.concat; +import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.FAST_DIFF; import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.MANY_CHILDREN_THRESHOLD; +import static org.apache.jackrabbit.oak.plugins.document.JournalEntry.fillExternalChanges; import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key; import static org.apache.jackrabbit.oak.plugins.document.UpdateOp.Operation; +import static org.apache.jackrabbit.oak.plugins.document.util.Utils.asStringValueIterable; import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getIdFromPath; +import static org.apache.jackrabbit.oak.plugins.document.util.Utils.pathToId; import static org.apache.jackrabbit.oak.plugins.document.util.Utils.unshareString; import java.io.Closeable; @@ -81,6 +86,7 @@ import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCach import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.commons.json.JsopStream; import org.apache.jackrabbit.oak.commons.json.JsopWriter; +import org.apache.jackrabbit.oak.commons.sort.StringSort; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.cache.CacheStats; @@ -235,6 +241,12 @@ public final class DocumentNodeStore private final Map splitCandidates = Maps.newConcurrentMap(); /** + * Summary of changes done by this cluster node to persist by the background + * update thread. + */ + private JournalEntry changes; + + /** * The last known revision for each cluster instance. * * Key: the machine id, value: revision. @@ -347,6 +359,8 @@ public final class DocumentNodeStore private final VersionGarbageCollector versionGarbageCollector; + private final JournalGarbageCollector journalGarbageCollector; + private final Executor executor; private final LastRevRecoveryAgent lastRevRecoveryAgent; @@ -370,6 +384,7 @@ public final class DocumentNodeStore s = new LoggingDocumentStoreWrapper(s); } this.store = s; + this.changes = Collection.JOURNAL.newDocument(s); this.executor = builder.getExecutor(); this.clock = builder.getClock(); int cid = builder.getClusterId(); @@ -389,6 +404,7 @@ public final class DocumentNodeStore this.asyncDelay = builder.getAsyncDelay(); this.versionGarbageCollector = new VersionGarbageCollector( this, builder.createVersionGCSupport()); + this.journalGarbageCollector = new JournalGarbageCollector(this); this.lastRevRecoveryAgent = new LastRevRecoveryAgent(this); this.disableBranches = builder.isDisableBranches(); this.missing = new DocumentNodeState(this, "MISSING", new Revision(0, 0, 0)) { @@ -416,7 +432,8 @@ public final class DocumentNodeStore checkpoints = new Checkpoints(this); // check if root node exists - if (store.find(Collection.NODES, Utils.getIdFromPath("/")) == null) { + NodeDocument rootDoc = store.find(NODES, Utils.getIdFromPath("/")); + if (rootDoc == null) { // root node is missing: repository is not initialized Revision head = newRevision(); Commit commit = new Commit(this, head, null, null); @@ -437,6 +454,11 @@ public final class DocumentNodeStore // no revision read from other cluster nodes setHeadRevision(newRevision()); } + // check if _lastRev for our clusterId exists + if (!rootDoc.getLastRev().containsKey(clusterId)) { + unsavedLastRevisions.put("/", headRevision); + backgroundWrite(); + } } getRevisionComparator().add(headRevision, Revision.newRevision(0)); @@ -630,6 +652,8 @@ public final class DocumentNodeStore Revision before = getHeadRevision(); // apply changes to cache based on before revision c.applyToCache(before, false); + // track modified paths + changes.modified(c.getModifiedPaths()); // update head revision setHeadRevision(c.getRevision()); dispatcher.contentChanged(getRoot(), info); @@ -1005,15 +1029,13 @@ public final class DocumentNodeStore } final Revision readRevision = parent.getLastRevision(); - return transform(getChildren(parent, name, limit).children, - new Function() { + return transform(getChildren(parent, name, limit).children, new Function() { @Override public DocumentNodeState apply(String input) { String p = concat(parent.getPath(), input); DocumentNodeState result = getNode(p, readRevision); if (result == null) { - throw new DocumentStoreException("DocumentNodeState is null for revision " + readRevision + " of " + p - + " (aborting getChildNodes())"); + throw new DocumentStoreException("DocumentNodeState is null for revision " + readRevision + " of " + p + " (aborting getChildNodes())"); } return result; } @@ -1032,10 +1054,8 @@ public final class DocumentNodeStore path, readRevision); return null; } - final DocumentNodeState result = doc.getNodeAtRevision(this, - readRevision, lastRevision); - PERFLOG.end(start, 1, "readNode: path={}, readRevision={}", path, - readRevision); + final DocumentNodeState result = doc.getNodeAtRevision(this, readRevision, lastRevision); + PERFLOG.end(start, 1, "readNode: path={}, readRevision={}", path, readRevision); return result; } @@ -1052,11 +1072,11 @@ public final class DocumentNodeStore * @param changed the list of changed child nodes. * */ - public void applyChanges(Revision rev, String path, - boolean isNew, boolean pendingLastRev, - boolean isBranchCommit, List added, - List removed, List changed, - DiffCache.Entry cacheEntry) { + void applyChanges(Revision rev, String path, + boolean isNew, boolean pendingLastRev, + boolean isBranchCommit, List added, + List removed, List changed, + DiffCache.Entry cacheEntry) { LastRevTracker tracker = createTracker(rev); if (disableBranches) { if (pendingLastRev) { @@ -1086,13 +1106,13 @@ public final class DocumentNodeStore // update diff cache JsopWriter w = new JsopStream(); for (String p : added) { - w.tag('+').key(PathUtils.getName(p)).object().endObject().newline(); + w.tag('+').key(PathUtils.getName(p)).object().endObject(); } for (String p : removed) { - w.tag('-').value(PathUtils.getName(p)).newline(); + w.tag('-').value(PathUtils.getName(p)); } for (String p : changed) { - w.tag('^').key(PathUtils.getName(p)).object().endObject().newline(); + w.tag('^').key(PathUtils.getName(p)).object().endObject(); } cacheEntry.append(path, w.toString()); @@ -1133,6 +1153,15 @@ public final class DocumentNodeStore } /** + * Called when a branch is merged. + * + * @param revisions the revisions of the merged branch commits. + */ + void revisionsMerged(@Nonnull Iterable revisions) { + changes.branchCommit(revisions); + } + + /** * Updates a commit root document. * * @param commit the updates to apply on the commit root document. @@ -1308,6 +1337,7 @@ public final class DocumentNodeStore UpdateOp op = new UpdateOp(Utils.getIdFromPath("/"), false); NodeDocument.setModified(op, commit.getRevision()); if (b != null) { + commit.addBranchCommits(b); Iterator mergeCommits = commit.getMergeRevisions().iterator(); for (Revision rev : b.getCommits()) { rev = rev.asTrunkRevision(); @@ -1673,6 +1703,8 @@ public final class DocumentNodeStore // then we saw this new revision (from another cluster node) Revision otherSeen = Revision.newRevision(0); + StringSort externalSort = JournalEntry.newSorter(); + Map externalChanges = Maps.newHashMap(); for (Map.Entry e : lastRevMap.entrySet()) { int machineId = e.getKey(); @@ -1693,6 +1725,16 @@ public final class DocumentNodeStore || r.getTimestamp() > revisionPurgeMillis()) { externalChanges.put(r, otherSeen); } + // collect external changes + if (last != null && externalSort != null) { + // add changes for this particular clusterId to the externalSort + try { + fillExternalChanges(externalSort, last, r, store); + } catch (IOException e1) { + LOG.error("backgroundRead: Exception while reading external changes from journal: "+e1, e1); + externalSort = null; + } + } } } @@ -1701,9 +1743,38 @@ public final class DocumentNodeStore if (!externalChanges.isEmpty()) { // invalidate caches - stats.cacheStats = store.invalidateCache(); - // TODO only invalidate affected items - docChildrenCache.invalidateAll(); + if (externalSort == null) { + // if no externalSort available, then invalidate the classic way: everything + stats.cacheStats = store.invalidateCache(); + docChildrenCache.invalidateAll(); + } else { + try { + externalSort.sort(); + stats.cacheStats = store.invalidateCache(pathToId(externalSort)); + // OAK-3002: only invalidate affected items (using journal) + long origSize = docChildrenCache.size(); + if (origSize == 0) { + // if docChildrenCache is empty, don't bother + // calling invalidateAll either way + // (esp calling invalidateAll(Iterable) will + // potentially iterate over all keys even though + // there's nothing to be deleted) + LOG.trace("backgroundRead: docChildrenCache nothing to invalidate"); + } else { + // however, if the docChildrenCache is not empty, + // use the invalidateAll(Iterable) variant, + // passing it a Iterable, as that's + // what is contained in the cache + docChildrenCache.invalidateAll(asStringValueIterable(externalSort)); + long newSize = docChildrenCache.size(); + LOG.trace("backgroundRead: docChildrenCache invalidation result: orig: {}, new: {} ", origSize, newSize); + } + } catch (Exception ioe) { + LOG.error("backgroundRead: got IOException during external sorting/cache invalidation (as a result, invalidating entire cache): "+ioe, ioe); + stats.cacheStats = store.invalidateCache(); + docChildrenCache.invalidateAll(); + } + } stats.cacheInvalidationTime = clock.getTime() - time; time = clock.getTime(); @@ -1712,7 +1783,6 @@ public final class DocumentNodeStore backgroundOperationLock.writeLock().lock(); try { stats.lock = clock.getTime() - time; - time = clock.getTime(); // the latest revisions of the current cluster node // happened before the latest revisions of other cluster nodes @@ -1721,9 +1791,24 @@ public final class DocumentNodeStore for (Map.Entry e : externalChanges.entrySet()) { revisionComparator.add(e.getKey(), e.getValue()); } + + Revision oldHead = headRevision; // the new head revision is after other revisions setHeadRevision(newRevision()); if (dispatchChange) { + time = clock.getTime(); + if (externalSort != null) { + // then there were external changes and reading them + // was successful -> apply them to the diff cache + try { + JournalEntry.applyTo(externalSort, diffCache, oldHead, headRevision); + } catch (Exception e1) { + LOG.error("backgroundRead: Exception while processing external changes from journal: "+e1, e1); + } + } + stats.populateDiffCache = clock.getTime() - time; + time = clock.getTime(); + dispatcher.contentChanged(getRoot().fromExternalChange(), null); } } finally { @@ -1742,6 +1827,7 @@ public final class DocumentNodeStore CacheInvalidationStats cacheStats; long readHead; long cacheInvalidationTime; + long populateDiffCache; long lock; long dispatchChanges; long purge; @@ -1756,6 +1842,7 @@ public final class DocumentNodeStore "cacheStats:" + cacheStatsMsg + ", head:" + readHead + ", cache:" + cacheInvalidationTime + + ", diff: " + populateDiffCache + ", lock:" + lock + ", dispatch:" + dispatchChanges + ", purge:" + purge + @@ -1844,7 +1931,15 @@ public final class DocumentNodeStore } BackgroundWriteStats backgroundWrite() { - return unsavedLastRevisions.persist(this, backgroundOperationLock.writeLock()); + return unsavedLastRevisions.persist(this, new UnsavedModifications.Snapshot() { + @Override + public void acquiring() { + if (store.create(JOURNAL, + singletonList(changes.asUpdateOp(getHeadRevision())))) { + changes = JOURNAL.newDocument(getDocumentStore()); + } + } + }, backgroundOperationLock.writeLock()); } //-----------------------------< internal >--------------------------------- @@ -1966,19 +2061,23 @@ public final class DocumentNodeStore case '^': { String name = unshareString(t.readString()); t.read(':'); - if (t.matches('{')) { - t.read('}'); - continueComparison = diff.childNodeChanged(name, - base.getChildNode(name), - node.getChildNode(name)); - } else if (t.matches('[')) { - // ignore multi valued property - while (t.read() != ']') { - // skip values + t.read('{'); + t.read('}'); + NodeState baseChild = base.getChildNode(name); + NodeState nodeChild = node.getChildNode(name); + if (baseChild.exists()) { + if (nodeChild.exists()) { + continueComparison = diff.childNodeChanged(name, + baseChild, nodeChild); + } else { + continueComparison = diff.childNodeDeleted(name, + baseChild); } } else { - // ignore single valued property - t.read(); + if (nodeChild.exists()) { + continueComparison = diff.childNodeAdded(name, + nodeChild); + } } break; } @@ -2069,13 +2168,14 @@ public final class DocumentNodeStore } } + String diff = w.toString(); if (debug) { long end = now(); - LOG.debug("Diff performed via '{}' at [{}] between revisions [{}] => [{}] took {} ms ({} ms)", + LOG.debug("Diff performed via '{}' at [{}] between revisions [{}] => [{}] took {} ms ({} ms), diff '{}'", diffAlgo, from.getPath(), fromRev, toRev, - end - start, getChildrenDoneIn - start); + end - start, getChildrenDoneIn - start, diff); } - return w.toString(); + return diff; } private void diffManyChildren(JsopWriter w, String path, Revision fromRev, Revision toRev) { @@ -2123,17 +2223,17 @@ public final class DocumentNodeStore if (a == null && b == null) { // ok } else if (a == null || b == null || !a.equals(b)) { - w.tag('^').key(name).object().endObject().newline(); + w.tag('^').key(name).object().endObject(); } } else { // does not exist in toRev -> was removed - w.tag('-').value(name).newline(); + w.tag('-').value(name); } } else { // does not exist in fromRev if (toNode != null) { // exists in toRev - w.tag('+').key(name).object().endObject().newline(); + w.tag('+').key(name).object().endObject(); } else { // does not exist in either revisions // -> do nothing @@ -2160,7 +2260,7 @@ public final class DocumentNodeStore Set childrenSet = Sets.newHashSet(toChildren.children); for (String n : fromChildren.children) { if (!childrenSet.contains(n)) { - w.tag('-').value(n).newline(); + w.tag('-').value(n); } else { String path = concat(parentPath, n); DocumentNodeState n1 = getNode(path, fromRev); @@ -2172,14 +2272,14 @@ public final class DocumentNodeStore checkNotNull(n1, "Node at [%s] not found for fromRev [%s]", path, fromRev); checkNotNull(n2, "Node at [%s] not found for toRev [%s]", path, toRev); if (!n1.getLastRevision().equals(n2.getLastRevision())) { - w.tag('^').key(n).object().endObject().newline(); + w.tag('^').key(n).object().endObject(); } } } childrenSet = Sets.newHashSet(fromChildren.children); for (String n : toChildren.children) { if (!childrenSet.contains(n)) { - w.tag('+').key(n).object().endObject().newline(); + w.tag('+').key(n).object().endObject(); } } } @@ -2465,6 +2565,12 @@ public final class DocumentNodeStore public VersionGarbageCollector getVersionGarbageCollector() { return versionGarbageCollector; } + + @Nonnull + public JournalGarbageCollector getJournalGarbageCollector() { + return journalGarbageCollector; + } + @Nonnull public LastRevRecoveryAgent getLastRevRecoveryAgent() { return lastRevRecoveryAgent; diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java index d5c5112..d766de1 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java @@ -33,6 +33,7 @@ import com.mongodb.DB; import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; import com.mongodb.MongoClientURI; + import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.ConfigurationPolicy; @@ -164,6 +165,23 @@ public class DocumentNodeStoreService { */ public static final String CUSTOM_BLOB_STORE = "customBlobStore"; + private static final long DEFAULT_JOURNAL_GC_INTERVAL_MILLIS = 5*60*1000; // default is 5min + @Property(longValue = DEFAULT_JOURNAL_GC_INTERVAL_MILLIS, + label = "Journal Garbage Collection Interval (millis)", + description = "Long value indicating interval (in milliseconds) with which the " + + "journal (for external changes) is cleaned up. Default is " + DEFAULT_JOURNAL_GC_INTERVAL_MILLIS + ) + private static final String PROP_JOURNAL_GC_INTERVAL_MILLIS = "journalGCInterval"; + + private static final long DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS = 6*60*60*1000; // default is 6hours + @Property(longValue = DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS, + label = "Maximum Age of Journal Entries (millis)", + description = "Long value indicating max age (in milliseconds) that " + + "journal (for external changes) entries are kept (older ones are candidates for gc). " + + "Default is " + DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS + ) + private static final String PROP_JOURNAL_GC_MAX_AGE_MILLIS = "journalGCMaxAge"; + private static final long MB = 1024 * 1024; private static enum DocumentStoreType { @@ -339,6 +357,7 @@ public class DocumentNodeStoreService { registerJMXBeans(mk.getNodeStore()); registerLastRevRecoveryJob(mk.getNodeStore()); + registerJournalGC(mk.getNodeStore()); NodeStore store; if (useMK) { @@ -518,6 +537,23 @@ public class DocumentNodeStoreService { recoverJob, TimeUnit.MILLISECONDS.toSeconds(leaseTime))); } + private void registerJournalGC(final DocumentNodeStore nodeStore) { + long journalGCInterval = toLong(context.getProperties().get(PROP_JOURNAL_GC_INTERVAL_MILLIS), + DEFAULT_JOURNAL_GC_INTERVAL_MILLIS); + final long journalGCMaxAge = toLong(context.getProperties().get(PROP_JOURNAL_GC_MAX_AGE_MILLIS), + DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS); + Runnable journalGCJob = new Runnable() { + + @Override + public void run() { + nodeStore.getJournalGarbageCollector().gc(journalGCMaxAge, TimeUnit.MILLISECONDS); + } + + }; + registrations.add(WhiteboardUtils.scheduleWithFixedDelay(whiteboard, + journalGCJob, TimeUnit.MILLISECONDS.toSeconds(journalGCInterval), true/*runOnSingleClusterNode*/)); + } + private Object prop(String propName) { return prop(propName, PREFIX + propName); } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java index 310ffe0..605f481 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentStore.java @@ -223,6 +223,13 @@ public interface DocumentStore { CacheInvalidationStats invalidateCache(); /** + * Invalidate the document cache but only with entries that match one + * of the keys provided. + */ + @CheckForNull + CacheInvalidationStats invalidateCache(Iterable keys); + + /** * Invalidate the document cache for the given key. * * @param the document type diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java new file mode 100644 index 0000000..2a5a7e7 --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.CheckForNull; +import javax.annotation.Nonnull; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.apache.jackrabbit.oak.commons.PathUtils; +import org.apache.jackrabbit.oak.commons.json.JsopBuilder; +import org.apache.jackrabbit.oak.commons.json.JsopReader; +import org.apache.jackrabbit.oak.commons.json.JsopTokenizer; +import org.apache.jackrabbit.oak.commons.sort.StringSort; +import org.apache.jackrabbit.oak.plugins.document.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.jackrabbit.oak.commons.PathUtils.concat; +import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL; + +/** + * Keeps track of changes performed between two consecutive background updates. + */ +public final class JournalEntry extends Document { + + private static final Logger LOG = LoggerFactory.getLogger(JournalEntry.class); + + /** + * The revision format for external changes: + * <clusterId>-<timestamp>-<counter>. The string is prefixed with + * "b" if it denotes a branch revision. + */ + private static final String REVISION_FORMAT = "%d-%0" + + Long.toHexString(Long.MAX_VALUE).length() + "x-%0" + + Integer.toHexString(Integer.MAX_VALUE).length() + "x"; + + private static final String CHANGES = "_c"; + + private static final String BRANCH_COMMITS = "_bc"; + + private static final int READ_CHUNK_SIZE = 100; + + /** + * switch to disk after 1MB + */ + private static final int STRING_SORT_OVERFLOW_TO_DISK_THRESHOLD = 1024 * 1024; + + private final DocumentStore store; + + private volatile TreeNode changes = null; + + JournalEntry(DocumentStore store) { + this.store = store; + } + + static StringSort newSorter() { + return new StringSort(STRING_SORT_OVERFLOW_TO_DISK_THRESHOLD, new Comparator() { + @Override + public int compare(String arg0, String arg1) { + return arg0.compareTo(arg1); + } + }); + } + + static void applyTo(@Nonnull StringSort externalSort, + @Nonnull DiffCache diffCache, + @Nonnull Revision from, + @Nonnull Revision to) throws IOException { + LOG.debug("applyTo: starting for {} to {}", from, to); + // note that it is not de-duplicated yet + LOG.debug("applyTo: sorting done."); + + final DiffCache.Entry entry = checkNotNull(diffCache).newEntry(from, to, false); + + final Iterator it = externalSort.getIds(); + if (!it.hasNext()) { + // nothing at all? that's quite unusual.. + + // we apply this diff as one '/' to the entry then + entry.append("/", ""); + entry.done(); + return; + } + String previousPath = it.next(); + TreeNode node = new TreeNode(); + node = node.getOrCreatePath(previousPath); + int totalCnt = 0; + int deDuplicatedCnt = 0; + while (it.hasNext()) { + totalCnt++; + final String currentPath = it.next(); + if (previousPath.equals(currentPath)) { + // de-duplication + continue; + } + final TreeNode currentNode = node.getOrCreatePath(currentPath); + + // 'node' contains one hierarchy line, eg /a, /a/b, /a/b/c, /a/b/c/d + // including the children on each level. + // these children have not yet been appended to the diffCache entry + // and have to be added as soon as the 'currentPath' is not + // part of that hierarchy anymore and we 'move elsewhere'. + // eg if 'currentPath' is /a/b/e, then we must flush /a/b/c/d and /a/b/c + while (node != null && !node.isAncestorOf(currentNode)) { + // add parent to the diff entry + entry.append(node.getPath(), getChanges(node)); + deDuplicatedCnt++; + node = node.parent; + } + + if (node == null) { + // we should never go 'passed' the root, hence node should + // never be null - if it becomes null anyway, start with + // a fresh root: + node = new TreeNode(); + node = node.getOrCreatePath(currentPath); + } else { + // this is the normal route: we add a direct or grand-child + // node to the current node: + node = currentNode; + } + previousPath = currentPath; + } + + // once we're done we still have the last hierarchy line contained in 'node', + // eg /x, /x/y, /x/y/z + // and that one we must now append to the diff cache entry: + while (node != null) { + entry.append(node.getPath(), getChanges(node)); + deDuplicatedCnt++; + node = node.parent; + } + + // and finally: mark the diff cache entry as 'done': + entry.done(); + LOG.debug("applyTo: done. totalCnt: {}, deDuplicatedCnt: {}", totalCnt, deDuplicatedCnt); + } + + /** + * Reads all external changes between the two given revisions (with the same + * clusterId) from the journal and appends the paths therein to the provided + * sorter. + * + * @param sorter the StringSort to which all externally changed paths + * between the provided revisions will be added + * @param from the lower bound of the revision range (exclusive). + * @param to the upper bound of the revision range (inclusive). + * @param store the document store to query. + * @throws IOException + */ + static void fillExternalChanges(@Nonnull StringSort sorter, + @Nonnull Revision from, + @Nonnull Revision to, + @Nonnull DocumentStore store) + throws IOException { + checkArgument(checkNotNull(from).getClusterId() == checkNotNull(to).getClusterId()); + + // to is inclusive, but DocumentStore.query() toKey is exclusive + final String inclusiveToId = asId(to); + to = new Revision(to.getTimestamp(), to.getCounter() + 1, + to.getClusterId(), to.isBranch()); + + // read in chunks to support very large sets of changes between + // subsequent background reads to do this, provide a (TODO eventually configurable) + // limit for the number of entries to be returned per query if the + // number of elements returned by the query is exactly the provided + // limit, then loop and do subsequent queries + final String toId = asId(to); + String fromId = asId(from); + while (true) { + if (fromId.equals(inclusiveToId)) { + // avoid query if from and to are off by just 1 counter (which + // we do due to exclusiveness of query borders) as in this case + // the query will always be empty anyway - so avoid doing the + // query in the first place + break; + } + List partialResult = store.query(JOURNAL, fromId, toId, READ_CHUNK_SIZE); + + for (JournalEntry d : partialResult) { + d.addTo(sorter); + } + if (partialResult.size() < READ_CHUNK_SIZE) { + break; + } + // otherwise set 'fromId' to the last entry just processed + // that works fine as the query is non-inclusive (ie does not + // include the from which we'd otherwise double-process) + fromId = partialResult.get(partialResult.size() - 1).getId(); + } + } + + long getRevisionTimestamp() { + final String[] parts = getId().split("-"); + return Long.parseLong(parts[1], 16); + } + + void modified(String path) { + TreeNode node = getChanges(); + for (String name : PathUtils.elements(path)) { + node = node.getOrCreate(name); + } + } + + void modified(Iterable paths) { + for (String p : paths) { + modified(p); + } + } + + void branchCommit(@Nonnull Iterable revisions) { + String branchCommits = (String) get(BRANCH_COMMITS); + if (branchCommits == null) { + branchCommits = ""; + } + for (Revision r : revisions) { + if (branchCommits.length() > 0) { + branchCommits += ","; + } + branchCommits += asId(r.asBranchRevision()); + } + put(BRANCH_COMMITS, branchCommits); + } + + String getChanges(String path) { + TreeNode node = getNode(path); + if (node == null) { + return ""; + } + return getChanges(node); + } + + UpdateOp asUpdateOp(@Nonnull Revision revision) { + String id = asId(revision); + UpdateOp op = new UpdateOp(id, true); + op.set(ID, id); + op.set(CHANGES, getChanges().serialize()); + String bc = (String) get(BRANCH_COMMITS); + if (bc != null) { + op.set(BRANCH_COMMITS, bc); + } + return op; + } + + void addTo(final StringSort sort) throws IOException { + TreeNode n = getChanges(); + TraversingVisitor v = new TraversingVisitor() { + + @Override + public void node(TreeNode node, String path) throws IOException { + sort.add(path); + } + }; + n.accept(v, "/"); + for (JournalEntry e : getBranchCommits()) { + e.getChanges().accept(v, "/"); + } + } + + /** + * Returns the branch commits that are related to this journal entry. + * + * @return the branch commits. + */ + @Nonnull + Iterable getBranchCommits() { + final List ids = Lists.newArrayList(); + String bc = (String) get(BRANCH_COMMITS); + if (bc != null) { + for (String id : bc.split(",")) { + ids.add(id); + } + } + return new Iterable() { + @Override + public Iterator iterator() { + return new AbstractIterator() { + + private final Iterator it = ids.iterator(); + + @Override + protected JournalEntry computeNext() { + if (!it.hasNext()) { + return endOfData(); + } + String id = it.next(); + JournalEntry d = store.find(JOURNAL, id); + if (d == null) { + throw new IllegalStateException( + "Missing external change for branch revision: " + id); + } + return d; + } + }; + } + }; + } + + //-----------------------------< internal >--------------------------------- + + private static String getChanges(TreeNode node) { + JsopBuilder builder = new JsopBuilder(); + for (String name : node.keySet()) { + builder.tag('^'); + builder.key(name); + builder.object().endObject(); + } + return builder.toString(); + } + + static String asId(@Nonnull Revision revision) { + checkNotNull(revision); + String s = String.format(REVISION_FORMAT, revision.getClusterId(), revision.getTimestamp(), revision.getCounter()); + if (revision.isBranch()) { + s = "b" + s; + } + return s; + } + + @CheckForNull + private TreeNode getNode(String path) { + TreeNode node = getChanges(); + for (String name : PathUtils.elements(path)) { + node = node.get(name); + if (node == null) { + return null; + } + } + return node; + } + + @Nonnull + private TreeNode getChanges() { + if (changes == null) { + TreeNode node = new TreeNode(); + String c = (String) get(CHANGES); + if (c != null) { + node.parse(new JsopTokenizer(c)); + } + changes = node; + } + return changes; + } + + private static final class TreeNode { + + private static final Map NO_CHILDREN = Collections.emptyMap(); + + private Map children = NO_CHILDREN; + + private final TreeNode parent; + private final String name; + + TreeNode() { + this(null, ""); + } + + TreeNode(TreeNode parent, String name) { + checkArgument(!name.contains("/"), + "name must not contain '/': {}", name); + + this.parent = parent; + this.name = name; + } + + TreeNode getOrCreatePath(String path) { + TreeNode n = getRoot(); + for (String name : PathUtils.elements(path)) { + n = n.getOrCreate(name); + } + return n; + } + + boolean isAncestorOf(TreeNode other) { + TreeNode n = other; + while (n.parent != null) { + if (this == n.parent) { + return true; + } + n = n.parent; + } + return false; + } + + @Nonnull + private TreeNode getRoot() { + TreeNode n = this; + while (n.parent != null) { + n = n.parent; + } + return n; + } + + private String getPath() { + return buildPath(new StringBuilder()).toString(); + } + + private StringBuilder buildPath(StringBuilder sb) { + if (parent != null) { + parent.buildPath(sb); + if (parent.parent != null) { + // only add slash if parent is not the root + sb.append("/"); + } + } else { + // this is the root + sb.append("/"); + } + sb.append(name); + return sb; + } + + void parse(JsopReader reader) { + reader.read('{'); + if (!reader.matches('}')) { + do { + String name = Utils.unescapePropertyName(reader.readString()); + reader.read(':'); + getOrCreate(name).parse(reader); + } while (reader.matches(',')); + reader.read('}'); + } + } + + String serialize() { + JsopBuilder builder = new JsopBuilder(); + builder.object(); + toJson(builder); + builder.endObject(); + return builder.toString(); + } + + @Nonnull + Set keySet() { + return children.keySet(); + } + + @CheckForNull + TreeNode get(String name) { + return children.get(name); + } + + void accept(TraversingVisitor visitor, String path) throws IOException { + visitor.node(this, path); + for (Map.Entry entry : children.entrySet()) { + entry.getValue().accept(visitor, concat(path, entry.getKey())); + } + } + + private void toJson(JsopBuilder builder) { + for (Map.Entry entry : children.entrySet()) { + builder.key(Utils.escapePropertyName(entry.getKey())); + builder.object(); + entry.getValue().toJson(builder); + builder.endObject(); + } + } + + @Nonnull + private TreeNode getOrCreate(String name) { + if (children == NO_CHILDREN) { + children = Maps.newHashMap(); + } + TreeNode c = children.get(name); + if (c == null) { + c = new TreeNode(this, name); + children.put(name, c); + } + return c; + } + } + + private interface TraversingVisitor { + + void node(TreeNode node, String path) throws IOException; + } + +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java new file mode 100644 index 0000000..1c8e56f --- /dev/null +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalGarbageCollector.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; + +/** + * The JournalGarbageCollector can clean up JournalEntries that are older than a + * particular age. + *

+ * It would typically be invoked in conjunction with the VersionGarbageCollector + * but must not be confused with that one - 'journal' refers to the separate + * collection that contains changed paths per background writes used for + * observation. + */ +public class JournalGarbageCollector { + + //copied from VersionGarbageCollector: + private static final int DELETE_BATCH_SIZE = 450; + + private final DocumentStore ds; + + private static final Logger log = LoggerFactory.getLogger(JournalGarbageCollector.class); + + public JournalGarbageCollector(DocumentNodeStore nodeStore) { + this.ds = nodeStore.getDocumentStore(); + } + + /** + * Deletes entries in the journal that are older than the given + * maxRevisionAge. + * + * @param maxRevisionAge entries older than this age will be removed + * @param unit the timeunit for maxRevisionAge + * @return the number of entries that have been removed + */ + public int gc(long maxRevisionAge, TimeUnit unit) { + long maxRevisionAgeInMillis = unit.toMillis(maxRevisionAge); + if (log.isDebugEnabled()) { + log.debug("gc: Journal garbage collection starts with maxAge: {} min.", TimeUnit.MILLISECONDS.toMinutes(maxRevisionAgeInMillis)); + } + Stopwatch sw = Stopwatch.createStarted(); + + // the journal has ids of the following format: + // 1-0000014db9aaf710-00000001 + // whereas the first number is the cluster node id. + // now, this format prevents from doing a generic + // query to get all 'old' entries, as the documentstore + // can only query for a sequential list of entries. + // (and the cluster node id here partitions the set + // of entries that we have to delete) + // To account for that, we simply iterate over all + // cluster node ids and clean them up individually. + // Note that there are possible alternatives, such + // as: let each node clean up its own old entries + // but the chosen path is also quite simple: it can + // be started on any instance - but best on only one. + // if it's run on multiple concurrently, then they + // will compete at deletion, which is not optimal + // due to performance, but does not harm. + + // 1. get the list of cluster node ids + final List clusterNodeInfos = ClusterNodeInfoDocument.all(ds); + int numDeleted = 0; + for (ClusterNodeInfoDocument clusterNodeInfoDocument : clusterNodeInfos) { + // current algorithm is to simply look at all cluster nodes + // irrespective of whether they are active or inactive etc. + // this could be optimized for inactive ones: at some point, all + // journal entries of inactive ones would have been cleaned up + // and at that point we could stop including those long-time-inactive ones. + // that 'long time' aspect would have to be tracked though, to be sure + // we don't leave garbage. + // so simpler is to quickly do a query even for long-time inactive ones + final int clusterNodeId = clusterNodeInfoDocument.getClusterId(); + + // 2. iterate over that list and do a query with + // a limit of 'batch size' + boolean branch = false; + long startPointer = 0; + while (true) { + String fromKey = JournalEntry.asId(new Revision(startPointer, 0, clusterNodeId, branch)); + String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis() - maxRevisionAgeInMillis, Integer.MAX_VALUE, clusterNodeId, branch)); + int limit = DELETE_BATCH_SIZE; + List deletionBatch = ds.query(Collection.JOURNAL, fromKey, toKey, limit); + if (deletionBatch.size() > 0) { + ds.remove(Collection.JOURNAL, asKeys(deletionBatch)); + numDeleted += deletionBatch.size(); + } + if (deletionBatch.size() < limit) { + if (!branch) { + // do the same for branches: + // this will start at the beginning again with branch set to true + // and eventually finish too + startPointer = 0; + branch = true; + continue; + } + break; + } + startPointer = deletionBatch.get(deletionBatch.size() - 1).getRevisionTimestamp(); + } + } + + sw.stop(); + + log.info("gc: Journal garbage collection took {}, deleted {} entries that were older than {} min.", sw, numDeleted, TimeUnit.MILLISECONDS.toMinutes(maxRevisionAgeInMillis)); + return numDeleted; + } + + private List asKeys(List deletionBatch) { + final List keys = new ArrayList(deletionBatch.size()); + for (JournalEntry e : deletionBatch) { + keys.add(e.getId()); + } + return keys; + } + +} diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java index 4ce7780..7abbf74 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java @@ -22,6 +22,8 @@ package org.apache.jackrabbit.oak.plugins.document; import static com.google.common.collect.ImmutableList.of; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.mergeSorted; +import static java.util.Collections.singletonList; +import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL; import java.io.IOException; import java.util.Iterator; @@ -126,6 +128,8 @@ public class LastRevRecoveryAgent { //Map of known last rev of checked paths UnsavedModifications knownLastRevs = new UnsavedModifications(); closer.register(knownLastRevs); + final DocumentStore docStore = nodeStore.getDocumentStore(); + final JournalEntry changes = JOURNAL.newDocument(docStore); while (suspects.hasNext()) { NodeDocument doc = suspects.next(); @@ -153,6 +157,7 @@ public class LastRevRecoveryAgent { //2. Update lastRev for parent paths aka rollup if (lastRevForParents != null) { String path = doc.getPath(); + changes.modified(path); // track all changes while (true) { if (PathUtils.denotesRoot(path)) { break; @@ -176,6 +181,9 @@ public class LastRevRecoveryAgent { } } + // take the root's lastRev + final Revision lastRootRev = unsaved.get("/"); + //Note the size before persist as persist operation //would empty the internal state int size = unsaved.getPaths().size(); @@ -184,7 +192,41 @@ public class LastRevRecoveryAgent { //UnsavedModifications is designed to be used in concurrent //access mode. For recovery case there is no concurrent access //involve so just pass a new lock instance - unsaved.persist(nodeStore, new ReentrantLock()); + + // the lock uses to do the persisting is a plain reentrant lock + // thus it doesn't matter, where exactly the check is done + // as to whether the recovered lastRev has already been + // written to the journal. + unsaved.persist(nodeStore, new UnsavedModifications.Snapshot() { + + @Override + public void acquiring() { + if (lastRootRev == null) { + // this should never happen - when unsaved has no changes + // that is reflected in the 'map' to be empty - in that + // case 'persist()' quits early and never calls + // acquiring() here. + // + // but even if it would occur - if we have no lastRootRev + // then we cannot and probably don't have to persist anything + return; + } + + final String id = JournalEntry.asId(lastRootRev); // lastRootRev never null at this point + final JournalEntry existingEntry = docStore.find(Collection.JOURNAL, id); + if (existingEntry != null) { + // then the journal entry was already written - as can happen if + // someone else (or the original instance itself) wrote the + // journal entry, then died. + // in this case, don't write it again. + // hence: nothing to be done here. return. + return; + } + + // otherwise store a new journal entry now + docStore.create(JOURNAL, singletonList(changes.asUpdateOp(lastRootRev))); + } + }, new ReentrantLock()); log.info("Updated lastRev of [{}] documents while performing lastRev recovery for " + "cluster node [{}]: {}", size, clusterId, updates); diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java index d25518d..84d27a7 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LocalDiffCache.java @@ -73,7 +73,8 @@ public class LocalDiffCache implements DiffCache { @Nonnull @Override public Entry newEntry(final @Nonnull Revision from, - final @Nonnull Revision to) { + final @Nonnull Revision to, + boolean local /*ignored*/) { return new Entry() { private final Map changesPerPath = Maps.newHashMap(); private int size; diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java index f962e00..855cdb2 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MemoryDiffCache.java @@ -80,7 +80,8 @@ public class MemoryDiffCache implements DiffCache { @Nonnull @Override public Entry newEntry(@Nonnull Revision from, - @Nonnull Revision to) { + @Nonnull Revision to, + boolean local /*ignored*/) { return new MemoryEntry(from, to); } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java index ac3b4b1..86b4516 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MergeCommit.java @@ -16,8 +16,13 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import java.util.Set; import java.util.SortedSet; +import javax.annotation.Nonnull; + +import com.google.common.collect.Sets; + /** * A merge commit containing multiple commit revisions. One for each branch * commit to merge. @@ -25,6 +30,7 @@ import java.util.SortedSet; class MergeCommit extends Commit { private final SortedSet mergeRevs; + private final Set branchCommits = Sets.newHashSet(); MergeCommit(DocumentNodeStore nodeStore, Revision baseRevision, @@ -37,8 +43,18 @@ class MergeCommit extends Commit { return mergeRevs; } + void addBranchCommits(@Nonnull Branch branch) { + for (Revision r : branch.getCommits()) { + if (!branch.getCommit(r).isRebase()) { + branchCommits.add(r); + } + } + } + @Override public void applyToCache(Revision before, boolean isBranchCommit) { - // do nothing for a merge commit + // do nothing for a merge commit, only notify node + // store about merged revisions + nodeStore.revisionsMerged(branchCommits); } } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java index df227d2..e10d1ef 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/TieredDiffCache.java @@ -29,8 +29,8 @@ import org.apache.jackrabbit.oak.cache.CacheStats; */ class TieredDiffCache implements DiffCache { - private final LocalDiffCache localCache; - private final MemoryDiffCache memoryCache; + private final DiffCache localCache; + private final DiffCache memoryCache; TieredDiffCache(DocumentMK.Builder builder) { this.localCache = new LocalDiffCache(builder); @@ -51,7 +51,8 @@ class TieredDiffCache implements DiffCache { } /** - * Creates a new entry in the {@link LocalDiffCache} only! + * Creates a new entry in the {@link LocalDiffCache} for local changes + * and {@link MemoryDiffCache} for external changes * * @param from the from revision. * @param to the to revision. @@ -59,8 +60,12 @@ class TieredDiffCache implements DiffCache { */ @Nonnull @Override - public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to) { - return localCache.newEntry(from, to); + public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to, boolean local) { + if (local) { + return localCache.newEntry(from, to, true); + } else { + return memoryCache.newEntry(from, to, false); + } } @Nonnull diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java index 8fb5d8d..4b156fd 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java @@ -162,11 +162,14 @@ class UnsavedModifications implements Closeable { * lock for a short period of time. * * @param store the document node store. + * @param snapshot callback when the snapshot of the pending changes is + * acquired. * @param lock the lock to acquire to get a consistent snapshot of the * revisions to write back. * @return stats about the write operation. */ public BackgroundWriteStats persist(@Nonnull DocumentNodeStore store, + @Nonnull Snapshot snapshot, @Nonnull Lock lock) { BackgroundWriteStats stats = new BackgroundWriteStats(); if (map.size() == 0) { @@ -178,13 +181,14 @@ class UnsavedModifications implements Closeable { Clock clock = store.getClock(); long time = clock.getTime(); - // get a copy of the map while holding the lock + // get a copy of the map while holding the lock lock.lock(); MapFactory tmpFactory = null; Map pending; try { stats.lock = clock.getTime() - time; time = clock.getTime(); + snapshot.acquiring(); if (map.size() > IN_MEMORY_SIZE_LIMIT) { tmpFactory = MapFactory.createFactory(); pending = tmpFactory.create(PathComparator.INSTANCE); @@ -265,4 +269,15 @@ class UnsavedModifications implements Closeable { public String toString() { return map.toString(); } + + public interface Snapshot { + + Snapshot IGNORE = new Snapshot() { + @Override + public void acquiring() { + } + }; + + void acquiring(); + } } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java index aa29186..d9a6530 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/memory/MemoryDocumentStore.java @@ -35,6 +35,7 @@ import org.apache.jackrabbit.oak.plugins.document.Collection; import org.apache.jackrabbit.oak.plugins.document.Document; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; import org.apache.jackrabbit.oak.plugins.document.DocumentStoreException; +import org.apache.jackrabbit.oak.plugins.document.JournalEntry; import org.apache.jackrabbit.oak.plugins.document.NodeDocument; import org.apache.jackrabbit.oak.plugins.document.Revision; import org.apache.jackrabbit.oak.plugins.document.StableRevisionComparator; @@ -73,6 +74,12 @@ public class MemoryDocumentStore implements DocumentStore { private ConcurrentSkipListMap settings = new ConcurrentSkipListMap(); + /** + * The 'externalChanges' collection. + */ + private ConcurrentSkipListMap externalChanges = + new ConcurrentSkipListMap(); + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); /** @@ -226,8 +233,10 @@ public class MemoryDocumentStore implements DocumentStore { return (ConcurrentSkipListMap) nodes; } else if (collection == Collection.CLUSTER_NODES) { return (ConcurrentSkipListMap) clusterNodes; - }else if (collection == Collection.SETTINGS) { + } else if (collection == Collection.SETTINGS) { return (ConcurrentSkipListMap) settings; + } else if (collection == Collection.JOURNAL) { + return (ConcurrentSkipListMap) externalChanges; } else { throw new IllegalArgumentException( "Unknown collection: " + collection.toString()); @@ -329,6 +338,11 @@ public class MemoryDocumentStore implements DocumentStore { } @Override + public CacheInvalidationStats invalidateCache(Iterable keys) { + return null; + } + + @Override public void dispose() { // ignore } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java index d4b6a56..38bf185 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java @@ -195,7 +195,7 @@ abstract class CacheInvalidator { PeekingIterator pitr = Iterators.peekingIterator(treeItr); Map sameLevelNodes = Maps.newHashMap(); - // Fetch only the lastRev map and id + // Fetch only the modCount and id final BasicDBObject keys = new BasicDBObject(Document.ID, 1); keys.put(Document.MOD_COUNT, 1); @@ -228,7 +228,7 @@ abstract class CacheInvalidator { QueryBuilder query = QueryBuilder.start(Document.ID) .in(idBatch); - // Fetch lastRev and modCount for each such nodes + // Fetch modCount for each such nodes DBCursor cursor = nodes.find(query.get(), keys); cursor.setReadPreference(ReadPreference.primary()); LOG.debug( diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java index 184a3e1..a22882c 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDiffCache.java @@ -91,7 +91,7 @@ public class MongoDiffCache extends MemoryDiffCache { if (changes == null && loader != null) { changes = loader.call(); // put into memory cache - super.newEntry(from, to).append(path, changes); + super.newEntry(from, to, false).append(path, changes); } return changes; } finally { @@ -102,7 +102,8 @@ public class MongoDiffCache extends MemoryDiffCache { @Nonnull @Override public Entry newEntry(@Nonnull final Revision from, - @Nonnull final Revision to) { + @Nonnull final Revision to, + boolean local /*ignored*/) { return new MemoryEntry(from, to) { private Diff commit = new Diff(from, to); @@ -172,7 +173,7 @@ public class MongoDiffCache extends MemoryDiffCache { // diff is complete LOG.debug("Built diff from {} commits", numCommits); // apply to diff cache and serve later requests from cache - d.applyToEntry(super.newEntry(from, to)).done(); + d.applyToEntry(super.newEntry(from, to, false)).done(); // return changes return d.getChanges(path); } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java index dc1f6ed..cb58471 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java @@ -66,6 +66,7 @@ import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; import org.apache.jackrabbit.oak.plugins.document.cache.ForwardingListener; import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocOffHeapCache; import org.apache.jackrabbit.oak.plugins.document.cache.OffHeapCache; +import org.apache.jackrabbit.oak.plugins.document.mongo.CacheInvalidator.InvalidationResult; import org.apache.jackrabbit.oak.plugins.document.util.StringValue; import org.apache.jackrabbit.oak.plugins.document.util.Utils; import org.apache.jackrabbit.oak.stats.Clock; @@ -115,6 +116,7 @@ public class MongoDocumentStore implements DocumentStore { private final DBCollection nodes; private final DBCollection clusterNodes; private final DBCollection settings; + private final DBCollection journal; private final Cache nodesCache; private final CacheStats cacheStats; @@ -192,12 +194,10 @@ public class MongoDocumentStore implements DocumentStore { .put("version", version) .build(); - nodes = db.getCollection( - Collection.NODES.toString()); - clusterNodes = db.getCollection( - Collection.CLUSTER_NODES.toString()); - settings = db.getCollection( - Collection.SETTINGS.toString()); + nodes = db.getCollection(Collection.NODES.toString()); + clusterNodes = db.getCollection(Collection.CLUSTER_NODES.toString()); + settings = db.getCollection(Collection.SETTINGS.toString()); + journal = db.getCollection(Collection.JOURNAL.toString()); maxReplicationLagMillis = builder.getMaxReplicationLagMillis(); @@ -295,6 +295,59 @@ public class MongoDocumentStore implements DocumentStore { //that would lead to lesser number of queries return CacheInvalidator.createHierarchicalInvalidator(this).invalidateCache(); } + + @Override + public CacheInvalidationStats invalidateCache(Iterable keys) { + LOG.debug("invalidateCache: start"); + final InvalidationResult result = new InvalidationResult(); + int size = 0; + + final Iterator it = keys.iterator(); + while(it.hasNext()) { + // read chunks of documents only + final List ids = new ArrayList(IN_CLAUSE_BATCH_SIZE); + while(it.hasNext() && ids.size() < IN_CLAUSE_BATCH_SIZE) { + final String id = it.next(); + if (getCachedNodeDoc(id) != null) { + // only add those that we actually do have cached + ids.add(id); + } + } + size += ids.size(); + if (LOG.isTraceEnabled()) { + LOG.trace("invalidateCache: batch size: {} of total so far {}", + ids.size(), size); + } + + QueryBuilder query = QueryBuilder.start(Document.ID).in(ids); + // Fetch only the modCount and id + final BasicDBObject fields = new BasicDBObject(Document.ID, 1); + fields.put(Document.MOD_COUNT, 1); + + DBCursor cursor = nodes.find(query.get(), fields); + cursor.setReadPreference(ReadPreference.primary()); + result.queryCount++; + + for (DBObject obj : cursor) { + result.cacheEntriesProcessedCount++; + String id = (String) obj.get(Document.ID); + Number modCount = (Number) obj.get(Document.MOD_COUNT); + + CachedNodeDocument cachedDoc = getCachedNodeDoc(id); + if (cachedDoc != null + && !Objects.equal(cachedDoc.getModCount(), modCount)) { + invalidateCache(Collection.NODES, id); + result.invalidationCount++; + } else { + result.upToDateCount++; + } + } + } + + result.cacheSize = size; + LOG.trace("invalidateCache: end. total: {}", size); + return result; + } @Override public void invalidateCache(Collection collection, String key) { @@ -360,29 +413,30 @@ public class MongoDocumentStore implements DocumentStore { try { TreeLock lock = acquire(key); try { - if (maxCacheAge == 0) { - invalidateCache(collection, key); - } - while (true) { - doc = nodesCache.get(cacheKey, new Callable() { - @Override - public NodeDocument call() throws Exception { - NodeDocument doc = (NodeDocument) findUncached(collection, key, getReadPreference(maxCacheAge)); - if (doc == null) { - doc = NodeDocument.NULL; + if (maxCacheAge > 0 || preferCached) { + // try again some other thread may have populated + // the cache by now + doc = nodesCache.getIfPresent(cacheKey); + if (doc != null) { + if (preferCached || + getTime() - doc.getCreated() < maxCacheAge) { + if (doc == NodeDocument.NULL) { + return null; } - return doc; + return (T) doc; } - }); - if (maxCacheAge == 0 || preferCached) { - break; - } - if (getTime() - doc.getCreated() < maxCacheAge) { - break; } - // too old: invalidate, try again - invalidateCache(collection, key); } + final NodeDocument d = (NodeDocument) findUncached( + collection, key, + getReadPreference(maxCacheAge)); + invalidateCache(collection, key); + doc = nodesCache.get(cacheKey, new Callable() { + @Override + public NodeDocument call() throws Exception { + return d == null ? NodeDocument.NULL : d; + } + }); } finally { lock.unlock(); } @@ -393,6 +447,8 @@ public class MongoDocumentStore implements DocumentStore { } } catch (ExecutionException e) { t = e.getCause(); + } catch (RuntimeException e) { + t = e; } throw new DocumentStoreException("Failed to load document with " + key, t); } @@ -514,9 +570,13 @@ public class MongoDocumentStore implements DocumentStore { } DBObject query = queryBuilder.get(); String parentId = Utils.getParentIdFromLowerLimit(fromKey); + long lockTime = -1; final long start = PERFLOG.start(); - TreeLock lock = withLock ? acquireExclusive(parentId != null ? parentId : "") : null; + TreeLock lock = acquireExclusive(parentId != null ? parentId : ""); try { + if (start != -1) { + lockTime = System.currentTimeMillis() - start; + } DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC); if (!disableIndexHint) { cursor.hint(hint); @@ -574,7 +634,7 @@ public class MongoDocumentStore implements DocumentStore { if (lock != null) { lock.unlock(); } - PERFLOG.end(start, 1, "query for children from [{}] to [{}]", fromKey, toKey); + PERFLOG.end(start, 1, "query for children from [{}] to [{}], lock:{}", fromKey, toKey, lockTime); } } @@ -968,7 +1028,9 @@ public class MongoDocumentStore implements DocumentStore { return clusterNodes; } else if (collection == Collection.SETTINGS) { return settings; - }else { + } else if (collection == Collection.JOURNAL) { + return journal; + } else { throw new IllegalArgumentException( "Unknown collection: " + collection.toString()); } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java index 6e5d8be..f2a96ee 100755 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBDocumentStore.java @@ -282,6 +282,12 @@ public class RDBDocumentStore implements DocumentStore { } return null; } + + @Override + public CacheInvalidationStats invalidateCache(Iterable keys) { + //TODO: optimize me + return invalidateCache(); + } @Override public void invalidateCache(Collection collection, String id) { @@ -783,7 +789,7 @@ public class RDBDocumentStore implements DocumentStore { private Set tablesToBeDropped = new HashSet(); // table names - private String tnNodes, tnClusterNodes, tnSettings; + private String tnNodes, tnClusterNodes, tnSettings, tnJournal; // ratio between Java characters and UTF-8 encoding // a) single characters will fit into 3 bytes @@ -825,6 +831,7 @@ public class RDBDocumentStore implements DocumentStore { this.tnNodes = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.NODES)); this.tnClusterNodes = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.CLUSTER_NODES)); this.tnSettings = RDBJDBCTools.createTableName(options.getTablePrefix(), TABLEMAP.get(Collection.SETTINGS)); + this.tnJournal = RDBJDBCTools.createTableName(options.getTablePrefix(), "JOURNAL"); this.ch = new RDBConnectionHandler(ds); this.callStack = LOG.isDebugEnabled() ? new Exception("call stack of RDBDocumentStore creation") : null; @@ -878,6 +885,7 @@ public class RDBDocumentStore implements DocumentStore { createTableFor(con, Collection.CLUSTER_NODES, tablesCreated, tablesPresent, tableDiags); createTableFor(con, Collection.NODES, tablesCreated, tablesPresent, tableDiags); createTableFor(con, Collection.SETTINGS, tablesCreated, tablesPresent, tableDiags); + createTableFor(con, Collection.JOURNAL, tablesCreated, tablesPresent, tableDiags); } finally { con.commit(); con.close(); @@ -1314,6 +1322,8 @@ public class RDBDocumentStore implements DocumentStore { return this.tnNodes; } else if (collection == Collection.SETTINGS) { return this.tnSettings; + } else if (collection == Collection.JOURNAL) { + return this.tnJournal; } else { throw new IllegalArgumentException("Unknown collection: " + collection.toString()); } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java index 06b87a0..d52c874 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LoggingDocumentStoreWrapper.java @@ -251,6 +251,17 @@ public class LoggingDocumentStoreWrapper implements DocumentStore { throw convert(e); } } + + @Override + public CacheInvalidationStats invalidateCache(Iterable keys) { + try { + logMethod("invalidateCache", keys); + return store.invalidateCache(keys); + } catch (Exception e) { + logException(e); + throw convert(e); + } + } @Override public void invalidateCache(Collection collection, String key) { diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java index 1e3be7f..8cbb15c 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/SynchronizingDocumentStoreWrapper.java @@ -107,6 +107,11 @@ public class SynchronizingDocumentStoreWrapper implements DocumentStore { } @Override + public synchronized CacheInvalidationStats invalidateCache(Iterable keys) { + return store.invalidateCache(keys); + } + + @Override public synchronized void invalidateCache(Collection collection, String key) { store.invalidateCache(collection, key); } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java index 0739f47..ddcd565 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/TimingDocumentStoreWrapper.java @@ -282,6 +282,18 @@ public class TimingDocumentStoreWrapper implements DocumentStore { throw convert(e); } } + + @Override + public CacheInvalidationStats invalidateCache(Iterable keys) { + try { + long start = now(); + CacheInvalidationStats result = base.invalidateCache(keys); + updateAndLogTimes("invalidateCache3", start, 0, 0); + return result; + } catch (Exception e) { + throw convert(e); + } + } @Override public void invalidateCache(Collection collection, String key) { diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java index f75b2bc..8423756 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/Utils.java @@ -34,6 +34,7 @@ import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import com.google.common.base.Function; import com.google.common.collect.AbstractIterator; import com.mongodb.BasicDBObject; @@ -49,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Iterables.transform; /** * Utility methods. @@ -557,4 +559,31 @@ public class Utils { public static boolean isHiddenPath(@Nonnull String path) { return path.contains("/:"); } + + /** + * Transforms the given {@link Iterable} from {@link String} to + * {@link StringValue} elements. The {@link Iterable} must no have + * {@code null} values. + */ + public static Iterable asStringValueIterable( + @Nonnull Iterable values) { + return transform(values, new Function() { + @Override + public StringValue apply(String input) { + return new StringValue(input); + } + }); + } + + /** + * Transforms the given paths into ids using {@link #getIdFromPath(String)}. + */ + public static Iterable pathToId(@Nonnull Iterable paths) { + return transform(paths, new Function() { + @Override + public String apply(String input) { + return getIdFromPath(input); + } + }); + } } diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java index f59064d..c81dfae 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/NodeObserver.java @@ -161,7 +161,7 @@ public abstract class NodeObserver implements Observer { while (!generator.isDone()) { generator.generate(); } - PERF_LOGGER.end(start, 10, + PERF_LOGGER.end(start, 100, "Generated events (before: {}, after: {})", previousRoot, root); } catch (Exception e) { diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java new file mode 100644 index 0000000..cf214e5 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AbstractJournalTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import com.google.common.collect.Lists; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.plugins.document.util.Utils; +import org.apache.jackrabbit.oak.spi.blob.BlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.junit.After; +import org.junit.Before; + +import static java.util.Arrays.asList; +import static org.junit.Assert.fail; + +/** + * Base class for journal related tests. + */ +public abstract class AbstractJournalTest { + + protected TestBuilder builder; + protected List mks = Lists.newArrayList(); + protected Random random; + + @Before + public void setup() { + random = new Random(); + } + + @Before + @After + public void clear() { + for (DocumentMK mk : mks) { + mk.dispose(); + } + mks.clear(); + } + + protected static void invalidateDocChildrenCache(DocumentNodeStore store) { + store.invalidateDocChildrenCache(); + } + + protected static void renewClusterIdLease(DocumentNodeStore store) { + store.renewClusterIdLease(); + } + + protected Set choose(List paths, int howMany) { + final Set result = new HashSet(); + while(result.size() createRandomPaths(int depth, int avgChildrenPerLevel, int num) { + final Set result = new HashSet(); + while(result.size()(result); + } + + protected String createRandomPath(int depth, int avgChildrenPerLevel) { + StringBuilder sb = new StringBuilder(); + for(int i=0; i paths, boolean runBgOpsAfterCreation) throws CommitFailedException { + NodeBuilder rootBuilder = ns.getRoot().builder(); + for(String path:paths) { + doGetOrCreate(rootBuilder, path); + } + ns.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + if (runBgOpsAfterCreation) { + ns.runBackgroundOperations(); + } + } + + protected void getOrCreate(DocumentNodeStore ns, String path, boolean runBgOpsAfterCreation) throws CommitFailedException { + NodeBuilder rootBuilder = ns.getRoot().builder(); + doGetOrCreate(rootBuilder, path); + ns.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + if (runBgOpsAfterCreation) { + ns.runBackgroundOperations(); + } + } + + protected NodeBuilder doGetOrCreate(NodeBuilder builder, String path) { + String[] parts = path.split("/"); + for(int i=1; i exp = new LinkedList(asList(expectedChanges)); + for(boolean branch : new Boolean[]{false, true}) { + String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch)); + String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch)); + List entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, expectedChanges.length+5); + if (entries.size()>0) { + for (JournalEntry journalEntry : entries) { + if (!exp.remove(journalEntry.get("_c"))) { + fail("Found an unexpected change: " + journalEntry.get("_c") + ", while all I expected was: " + asList(expectedChanges)); + } + } + } + } + if (exp.size()>0) { + fail("Did not find all expected changes, left over: "+exp+" (from original list which is: "+asList(expectedChanges)+")"); + } + } + + protected int countJournalEntries(DocumentNodeStore ds, int max) { + int total = 0; + for(boolean branch : new Boolean[]{false, true}) { + String fromKey = JournalEntry.asId(new Revision(0, 0, ds.getClusterId(), branch)); + String toKey = JournalEntry.asId(new Revision(System.currentTimeMillis()+1000, 0, ds.getClusterId(), branch)); + List entries = ds.getDocumentStore().query(Collection.JOURNAL, fromKey, toKey, max); + total+=entries.size(); + } + return total; + } + + protected NodeDocument getDocument(DocumentNodeStore nodeStore, String path) { + return nodeStore.getDocumentStore().find(Collection.NODES, Utils.getIdFromPath(path)); + } + + protected TestBuilder newDocumentMKBuilder() { + return new TestBuilder(); + } + + protected DocumentMK createMK(int clusterId, int asyncDelay, + DocumentStore ds, BlobStore bs) { + builder = newDocumentMKBuilder(); + return register(builder.setDocumentStore(ds) + .setBlobStore(bs).setClusterId(clusterId) + .setAsyncDelay(asyncDelay).open()); + } + + protected DocumentMK register(DocumentMK mk) { + mks.add(mk); + return mk; + } + + protected final class TestBuilder extends DocumentMK.Builder { + CountingDocumentStore actualStore; + CountingTieredDiffCache actualDiffCache; + + @Override + public DocumentStore getDocumentStore() { + if (actualStore==null) { + actualStore = new CountingDocumentStore(super.getDocumentStore()); + } + return actualStore; + } + + @Override + public DiffCache getDiffCache() { + if (actualDiffCache==null) { + actualDiffCache = new CountingTieredDiffCache(this); + } + return actualDiffCache; + } + } +} diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java index 2b696d5..bce2f8b 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/AmnesiaDiffCache.java @@ -47,7 +47,7 @@ class AmnesiaDiffCache implements DiffCache { @Nonnull @Override - public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to) { + public Entry newEntry(@Nonnull Revision from, @Nonnull Revision to, boolean local) { return new Entry() { @Override public void append(@Nonnull String path, @Nonnull String changes) { diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java index 6aa4b03..6dd854e 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterTest.java @@ -370,6 +370,10 @@ public class ClusterTest { rootStates2.add((DocumentNodeState) root); } }); + + ns1.runBackgroundOperations(); + ns2.runBackgroundOperations(); + rootStates1.clear(); rootStates2.clear(); diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java new file mode 100644 index 0000000..ea09657 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingDocumentStore.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.oak.cache.CacheStats; +import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition; +import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key; +import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats; + +public class CountingDocumentStore implements DocumentStore { + + private DocumentStore delegate; + + //TODO: remove mec + boolean printStacks; + + class Stats { + + private int numFindCalls; + private int numQueryCalls; + private int numRemoveCalls; + private int numCreateOrUpdateCalls; + + } + + private Map collectionStats = new HashMap(); + + public CountingDocumentStore(DocumentStore delegate) { + this.delegate = delegate; + } + + public void resetCounters() { + collectionStats.clear(); + } + + public int getNumFindCalls(Collection collection) { + return getStats(collection).numFindCalls; + } + + public int getNumQueryCalls(Collection collection) { + return getStats(collection).numQueryCalls; + } + + public int getNumRemoveCalls(Collection collection) { + return getStats(collection).numRemoveCalls; + } + + public int getNumCreateOrUpdateCalls(Collection collection) { + return getStats(collection).numCreateOrUpdateCalls; + } + + private Stats getStats(Collection collection) { + if (!collectionStats.containsKey(collection)) { + Stats s = new Stats(); + collectionStats.put(collection, s); + return s; + } else { + return collectionStats.get(collection); + } + } + + @Override + public T find(Collection collection, String key) { + getStats(collection).numFindCalls++; + if (printStacks) { + new Exception("find [" + getStats(collection).numFindCalls + "] (" + collection + ") " + key).printStackTrace(); + } + return delegate.find(collection, key); + } + + @Override + public T find(Collection collection, + String key, + int maxCacheAge) { + getStats(collection).numFindCalls++; + if (printStacks) { + new Exception("find [" + getStats(collection).numFindCalls + "] (" + collection + ") " + key + " [max: " + maxCacheAge + "]").printStackTrace(); + } + return delegate.find(collection, key, maxCacheAge); + } + + @Nonnull + @Override + public List query(Collection collection, + String fromKey, + String toKey, + int limit) { + getStats(collection).numQueryCalls++; + if (printStacks) { + new Exception("query1 [" + getStats(collection).numQueryCalls + "] (" + collection + ") " + fromKey + ", to " + toKey + ". limit " + limit).printStackTrace(); + } + return delegate.query(collection, fromKey, toKey, limit); + } + + @Nonnull + @Override + public List query(Collection collection, + String fromKey, + String toKey, + String indexedProperty, + long startValue, + int limit) { + getStats(collection).numQueryCalls++; + if (printStacks) { + new Exception("query2 [" + getStats(collection).numQueryCalls + "] (" + collection + ") " + fromKey + ", to " + toKey + ". limit " + limit).printStackTrace(); + } + return delegate.query(collection, fromKey, toKey, indexedProperty, startValue, limit); + } + + @Override + public void remove(Collection collection, + String key) { + getStats(collection).numRemoveCalls++; + delegate.remove(collection, key); + } + + @Override + public void remove(Collection collection, + List keys) { + getStats(collection).numRemoveCalls++; + delegate.remove(collection, keys); + } + + @Override + public int remove(Collection collection, + Map> toRemove) { + getStats(collection).numRemoveCalls++; + return delegate.remove(collection, toRemove); + } + + @Override + public boolean create(Collection collection, + List updateOps) { + getStats(collection).numCreateOrUpdateCalls++; + return delegate.create(collection, updateOps); + } + + @Override + public void update(Collection collection, + List keys, + UpdateOp updateOp) { + getStats(collection).numCreateOrUpdateCalls++; + delegate.update(collection, keys, updateOp); + } + + @Override + public T createOrUpdate(Collection collection, + UpdateOp update) { + getStats(collection).numCreateOrUpdateCalls++; + return delegate.createOrUpdate(collection, update); + } + + @Override + public T findAndUpdate(Collection collection, + UpdateOp update) { + getStats(collection).numCreateOrUpdateCalls++; + return delegate.findAndUpdate(collection, update); + } + + @Override + public CacheInvalidationStats invalidateCache() { + return delegate.invalidateCache(); + } + + @Override + public CacheInvalidationStats invalidateCache(Iterable keys) { + return delegate.invalidateCache(keys); + } + + @Override + public void invalidateCache(Collection collection, + String key) { + delegate.invalidateCache(collection, key); + } + + @Override + public void dispose() { + delegate.dispose(); + } + + @Override + public T getIfCached(Collection collection, + String key) { + return delegate.getIfCached(collection, key); + } + + @Override + public void setReadWriteMode(String readWriteMode) { + delegate.setReadWriteMode(readWriteMode); + } + + @Override + public CacheStats getCacheStats() { + return delegate.getCacheStats(); + } + + @Override + public Map getMetadata() { + return delegate.getMetadata(); + } + +} diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java new file mode 100644 index 0000000..d7c0170 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/CountingTieredDiffCache.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +public class CountingTieredDiffCache extends TieredDiffCache { + + class CountingLoader implements Loader { + + private Loader delegate; + + CountingLoader(Loader delegate) { + this.delegate = delegate; + } + + @Override + public String call() { + incLoadCount(); + return delegate.call(); + } + + } + + private int loadCount; + + public CountingTieredDiffCache(DocumentMK.Builder builder) { + super(builder); + } + + private void incLoadCount() { + loadCount++; + } + + public int getLoadCount() { + return loadCount; + } + + public void resetLoadCounter() { + loadCount = 0; + } + + @Override + public String getChanges(@Nonnull Revision from, + @Nonnull Revision to, + @Nonnull String path, + @Nullable Loader loader) { + return super.getChanges(from, to, path, new CountingLoader(loader)); + } +} diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java index 1f9accf..0ac5153 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java @@ -60,6 +60,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; @@ -102,8 +103,8 @@ public class DocumentNodeStoreTest { DocumentStore docStore = new MemoryDocumentStore(); DocumentStore testStore = new TimingDocumentStoreWrapper(docStore) { @Override - public CacheInvalidationStats invalidateCache() { - super.invalidateCache(); + public CacheInvalidationStats invalidateCache(Iterable keys) { + super.invalidateCache(keys); semaphore.acquireUninterruptibly(); semaphore.release(); return null; @@ -1667,7 +1668,7 @@ public class DocumentNodeStoreTest { merge(ns, builder); Revision to = ns.getHeadRevision(); - DiffCache.Entry entry = ns.getDiffCache().newEntry(from, to); + DiffCache.Entry entry = ns.getDiffCache().newEntry(from, to, true); entry.append("/", "-\"foo\""); entry.done(); diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java new file mode 100644 index 0000000..1be71fa --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.jackrabbit.oak.commons.PathUtils; +import org.apache.jackrabbit.oak.commons.json.JsopReader; +import org.apache.jackrabbit.oak.commons.json.JsopTokenizer; +import org.apache.jackrabbit.oak.commons.sort.StringSort; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.junit.Test; + +import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link JournalEntry}. + */ +public class JournalEntryTest { + + @Test + public void applyTo() throws Exception { + DiffCache cache = new MemoryDiffCache(new DocumentMK.Builder()); + List paths = Lists.newArrayList(); + addRandomPaths(paths); + StringSort sort = JournalEntry.newSorter(); + add(sort, paths); + Revision from = new Revision(1, 0, 1); + Revision to = new Revision(2, 0, 1); + sort.sort(); + JournalEntry.applyTo(sort, cache, from, to); + + for (String p : paths) { + String changes = cache.getChanges(from, to, p, null); + assertNotNull("missing changes for " + p, changes); + for (String c : getChildren(changes)) { + assertTrue(paths.contains(PathUtils.concat(p, c))); + } + } + sort.close(); + } + + @Test + public void fillExternalChanges() throws Exception { + DocumentStore store = new MemoryDocumentStore(); + JournalEntry entry = JOURNAL.newDocument(store); + Set paths = Sets.newHashSet(); + addRandomPaths(paths); + entry.modified(paths); + Revision r1 = new Revision(1, 0, 1); + Revision r2 = new Revision(2, 0, 1); + Revision r3 = new Revision(3, 0, 1); + UpdateOp op = entry.asUpdateOp(r2); + assertTrue(store.create(JOURNAL, Collections.singletonList(op))); + + StringSort sort = JournalEntry.newSorter(); + JournalEntry.fillExternalChanges(sort, r2, r3, store); + assertEquals(0, sort.getSize()); + + JournalEntry.fillExternalChanges(sort, r1, r2, store); + assertEquals(paths.size(), sort.getSize()); + sort.close(); + + sort = JournalEntry.newSorter(); + JournalEntry.fillExternalChanges(sort, r1, r3, store); + assertEquals(paths.size(), sort.getSize()); + sort.close(); + } + + @Test + public void getRevisionTimestamp() throws Exception { + DocumentStore store = new MemoryDocumentStore(); + JournalEntry entry = JOURNAL.newDocument(store); + entry.modified("/foo"); + Revision r = Revision.newRevision(1); + assertTrue(store.create(JOURNAL, + Collections.singletonList(entry.asUpdateOp(r)))); + entry = store.find(JOURNAL, JournalEntry.asId(r)); + assertEquals(r.getTimestamp(), entry.getRevisionTimestamp()); + } + + private static void addRandomPaths(java.util.Collection paths) throws IOException { + paths.add("/"); + Random random = new Random(42); + for (int i = 0; i < 1000; i++) { + String path = "/"; + int depth = random.nextInt(6); + for (int j = 0; j < depth; j++) { + char name = (char) ('a' + random.nextInt(26)); + path = PathUtils.concat(path, String.valueOf(name)); + paths.add(path); + } + } + } + + private static void add(StringSort sort, List paths) + throws IOException { + for (String p : paths) { + sort.add(p); + } + } + + private static List getChildren(String diff) { + List children = Lists.newArrayList(); + JsopTokenizer t = new JsopTokenizer(diff); + for (;;) { + int r = t.read(); + switch (r) { + case '^': { + children.add(t.readString()); + t.read(':'); + t.read('{'); + t.read('}'); + break; + } + case JsopReader.END: { + return children; + } + default: + fail("Unexpected token: " + r); + } + } + } +} diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java new file mode 100644 index 0000000..879f72b --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; + +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.commit.Observer; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.spi.state.NodeStateDiff; +import org.junit.Test; + +import static java.util.Collections.synchronizedList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class JournalTest extends AbstractJournalTest { + + private MemoryDocumentStore ds; + private MemoryBlobStore bs; + + class DiffingObserver implements Observer, Runnable, NodeStateDiff { + + final List incomingRootStates1 = Lists.newArrayList(); + final List diffedRootStates1 = Lists.newArrayList(); + + DocumentNodeState oldRoot = null; + + DiffingObserver(boolean startInBackground) { + if (startInBackground) { + // start the diffing in the background - so as to not + // interfere with the contentChanged call + Thread th = new Thread(this); + th.setDaemon(true); + th.start(); + } + } + + public void clear() { + synchronized(incomingRootStates1) { + incomingRootStates1.clear(); + diffedRootStates1.clear(); + } + } + + @Override + public void contentChanged(NodeState root, CommitInfo info) { + synchronized(incomingRootStates1) { + incomingRootStates1.add((DocumentNodeState) root); + incomingRootStates1.notifyAll(); + } + } + + public void processAll() { + while(processOne()) { + // continue + } + } + + public boolean processOne() { + DocumentNodeState newRoot; + synchronized(incomingRootStates1) { + if (incomingRootStates1.size()==0) { + return false; + } + newRoot = incomingRootStates1.remove(0); + } + if (oldRoot!=null) { + newRoot.compareAgainstBaseState(oldRoot, this); + } + oldRoot = newRoot; + synchronized(incomingRootStates1) { + diffedRootStates1.add(newRoot); + } + return true; + } + + @Override + public void run() { + while(true) { + DocumentNodeState newRoot; + synchronized(incomingRootStates1) { + while(incomingRootStates1.size()==0) { + try { + incomingRootStates1.wait(); + } catch (InterruptedException e) { + // ignore + } + } + newRoot = incomingRootStates1.remove(0); + } + if (oldRoot!=null) { + newRoot.compareAgainstBaseState(oldRoot, this); + } + oldRoot = newRoot; + synchronized(incomingRootStates1) { + diffedRootStates1.add(newRoot); + } + } + } + + @Override + public boolean propertyAdded(PropertyState after) { + return true; + } + + @Override + public boolean propertyChanged(PropertyState before, PropertyState after) { + return true; + } + + @Override + public boolean propertyDeleted(PropertyState before) { + return true; + } + + @Override + public boolean childNodeAdded(String name, NodeState after) { + return true; + } + + @Override + public boolean childNodeChanged(String name, NodeState before, + NodeState after) { + return true; + } + + @Override + public boolean childNodeDeleted(String name, NodeState before) { + return true; + } + + public int getTotal() { + synchronized(incomingRootStates1) { + return incomingRootStates1.size() + diffedRootStates1.size(); + } + } + + } + + @Test + public void cleanupTest() throws Exception { + DocumentMK mk1 = createMK(0 /* clusterId: 0 => uses clusterNodes collection */, 0); + DocumentNodeStore ns1 = mk1.getNodeStore(); + // make sure we're visible and marked as active + ns1.renewClusterIdLease(); + JournalGarbageCollector gc = new JournalGarbageCollector(ns1); + // first clean up + gc.gc(1, TimeUnit.MILLISECONDS); + Thread.sleep(100); // sleep just quickly + assertEquals(0, gc.gc(1, TimeUnit.DAYS)); + assertEquals(0, gc.gc(6, TimeUnit.HOURS)); + assertEquals(0, gc.gc(1, TimeUnit.HOURS)); + assertEquals(0, gc.gc(10, TimeUnit.MINUTES)); + assertEquals(0, gc.gc(1, TimeUnit.MINUTES)); + assertEquals(0, gc.gc(1, TimeUnit.SECONDS)); + assertEquals(0, gc.gc(1, TimeUnit.MILLISECONDS)); + + // create some entries that can be deleted thereupon + mk1.commit("/", "+\"regular1\": {}", null, null); + mk1.commit("/", "+\"regular2\": {}", null, null); + mk1.commit("/", "+\"regular3\": {}", null, null); + mk1.commit("/regular2", "+\"regular4\": {}", null, null); + Thread.sleep(100); // sleep 100millis + assertEquals(0, gc.gc(5, TimeUnit.SECONDS)); + assertEquals(0, gc.gc(1, TimeUnit.MILLISECONDS)); + ns1.runBackgroundOperations(); + mk1.commit("/", "+\"regular5\": {}", null, null); + ns1.runBackgroundOperations(); + mk1.commit("/", "+\"regular6\": {}", null, null); + ns1.runBackgroundOperations(); + Thread.sleep(100); // sleep 100millis + assertEquals(0, gc.gc(5, TimeUnit.SECONDS)); + assertEquals(3, gc.gc(1, TimeUnit.MILLISECONDS)); + } + + @Test + public void journalTest() throws Exception { + DocumentMK mk1 = createMK(1, 0); + DocumentNodeStore ns1 = mk1.getNodeStore(); + CountingDocumentStore countingDocStore1 = builder.actualStore; + CountingTieredDiffCache countingDiffCache1 = builder.actualDiffCache; + + DocumentMK mk2 = createMK(2, 0); + DocumentNodeStore ns2 = mk2.getNodeStore(); + CountingDocumentStore countingDocStore2 = builder.actualStore; + CountingTieredDiffCache countingDiffCache2 = builder.actualDiffCache; + + final DiffingObserver observer = new DiffingObserver(false); + ns1.addObserver(observer); + + ns1.runBackgroundOperations(); + ns2.runBackgroundOperations(); + observer.processAll(); // to make sure we have an 'oldRoot' + observer.clear(); + countingDocStore1.resetCounters(); + countingDocStore2.resetCounters(); + // countingDocStore1.printStacks = true; + countingDiffCache1.resetLoadCounter(); + countingDiffCache2.resetLoadCounter(); + + mk2.commit("/", "+\"regular1\": {}", null, null); + mk2.commit("/", "+\"regular2\": {}", null, null); + mk2.commit("/", "+\"regular3\": {}", null, null); + mk2.commit("/regular2", "+\"regular4\": {}", null, null); + // flush to journal + ns2.runBackgroundOperations(); + + // nothing notified yet + assertEquals(0, observer.getTotal()); + assertEquals(0, countingDocStore1.getNumFindCalls(Collection.NODES)); + assertEquals(0, countingDocStore1.getNumQueryCalls(Collection.NODES)); + assertEquals(0, countingDocStore1.getNumRemoveCalls(Collection.NODES)); + assertEquals(0, countingDocStore1.getNumCreateOrUpdateCalls(Collection.NODES)); + assertEquals(0, countingDiffCache1.getLoadCount()); + + // let node 1 read those changes + // System.err.println("run background ops"); + ns1.runBackgroundOperations(); + mk2.commit("/", "+\"regular5\": {}", null, null); + ns2.runBackgroundOperations(); + ns1.runBackgroundOperations(); + // and let the observer process everything + observer.processAll(); + countingDocStore1.printStacks = false; + + // now expect 1 entry in rootStates + assertEquals(2, observer.getTotal()); + assertEquals(0, countingDiffCache1.getLoadCount()); + assertEquals(0, countingDocStore1.getNumRemoveCalls(Collection.NODES)); + assertEquals(0, countingDocStore1.getNumCreateOrUpdateCalls(Collection.NODES)); + assertEquals(0, countingDocStore1.getNumQueryCalls(Collection.NODES)); +// assertEquals(0, countingDocStore1.getNumFindCalls(Collection.NODES)); + } + + @Test + public void externalBranchChange() throws Exception { + DocumentMK mk1 = createMK(1, 0); + DocumentNodeStore ns1 = mk1.getNodeStore(); + DocumentMK mk2 = createMK(2, 0); + DocumentNodeStore ns2 = mk2.getNodeStore(); + + ns1.runBackgroundOperations(); + ns2.runBackgroundOperations(); + + mk1.commit("/", "+\"regular1\": {}", null, null); + // flush to journal + ns1.runBackgroundOperations(); + mk1.commit("/regular1", "+\"regular1child\": {}", null, null); + // flush to journal + ns1.runBackgroundOperations(); + mk1.commit("/", "+\"regular2\": {}", null, null); + // flush to journal + ns1.runBackgroundOperations(); + mk1.commit("/", "+\"regular3\": {}", null, null); + // flush to journal + ns1.runBackgroundOperations(); + mk1.commit("/", "+\"regular4\": {}", null, null); + // flush to journal + ns1.runBackgroundOperations(); + mk1.commit("/", "+\"regular5\": {}", null, null); + // flush to journal + ns1.runBackgroundOperations(); + String b1 = mk1.branch(null); + b1 = mk1.commit("/", "+\"branchVisible\": {}", b1, null); + mk1.merge(b1, null); + + // to flush the branch commit either dispose of mk1 + // or run the background operations explicitly + // (as that will propagate the lastRev to the root) + ns1.runBackgroundOperations(); + ns2.runBackgroundOperations(); + + String nodes = mk2.getNodes("/", null, 0, 0, 100, null); + assertEquals("{\"branchVisible\":{},\"regular1\":{},\"regular2\":{},\"regular3\":{},\"regular4\":{},\"regular5\":{},\":childNodeCount\":6}", nodes); + } + + /** Inspired by LastRevRecoveryTest.testRecover() - simplified and extended with journal related asserts **/ + @Test + public void lastRevRecoveryJournalTest() throws Exception { + doLastRevRecoveryJournalTest(false); + } + + /** Inspired by LastRevRecoveryTest.testRecover() - simplified and extended with journal related asserts **/ + @Test + public void lastRevRecoveryJournalTestWithConcurrency() throws Exception { + doLastRevRecoveryJournalTest(true); + } + + private void doLastRevRecoveryJournalTest(boolean testConcurrency) throws Exception { + DocumentMK mk1 = createMK(0 /*clusterId via clusterNodes collection*/, 0); + DocumentNodeStore ds1 = mk1.getNodeStore(); + int c1Id = ds1.getClusterId(); + DocumentMK mk2 = createMK(0 /*clusterId via clusterNodes collection*/, 0); + DocumentNodeStore ds2 = mk2.getNodeStore(); + final int c2Id = ds2.getClusterId(); + + // should have 1 each with just the root changed + assertJournalEntries(ds1, "{}"); + assertJournalEntries(ds2, "{}"); + assertEquals(1, countJournalEntries(ds1, 10)); + assertEquals(1, countJournalEntries(ds2, 10)); + + //1. Create base structure /x/y + NodeBuilder b1 = ds1.getRoot().builder(); + b1.child("x").child("y"); + ds1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY); + ds1.runBackgroundOperations(); + + //lastRev are persisted directly for new nodes. In case of + // updates they are persisted via background jobs + + //1.2 Get last rev populated for root node for ds2 + ds2.runBackgroundOperations(); + NodeBuilder b2 = ds2.getRoot().builder(); + b2.child("x").setProperty("f1","b1"); + ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY); + ds2.runBackgroundOperations(); + + //2. Add a new node /x/y/z + b2 = ds2.getRoot().builder(); + b2.child("x").child("y").child("z").setProperty("foo", "bar"); + ds2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + //Refresh DS1 + ds1.runBackgroundOperations(); + + final NodeDocument z1 = getDocument(ds1, "/x/y/z"); + NodeDocument y1 = getDocument(ds1, "/x/y"); + final NodeDocument x1 = getDocument(ds1, "/x"); + + Revision head2 = ds2.getHeadRevision(); + + //lastRev should not be updated for C #2 + assertNull(y1.getLastRev().get(c2Id)); + + final LastRevRecoveryAgent recovery = new LastRevRecoveryAgent(ds1); + + // besides the former root change, now 1 also has + final String change1 = "{\"x\":{\"y\":{}}}"; + assertJournalEntries(ds1, "{}", change1); + final String change2 = "{\"x\":{}}"; + assertJournalEntries(ds2, "{}", change2); + + + String change2b = "{\"x\":{\"y\":{\"z\":{}}}}"; + + if (!testConcurrency) { + //Do not pass y1 but still y1 should be updated + recovery.recover(Iterators.forArray(x1,z1), c2Id); + + //Post recovery the lastRev should be updated for /x/y and /x + assertEquals(head2, getDocument(ds1, "/x/y").getLastRev().get(c2Id)); + assertEquals(head2, getDocument(ds1, "/x").getLastRev().get(c2Id)); + assertEquals(head2, getDocument(ds1, "/").getLastRev().get(c2Id)); + + // now 1 is unchanged, but 2 was recovered now, so has one more: + assertJournalEntries(ds1, "{}", change1); // unchanged + assertJournalEntries(ds2, "{}", change2, change2b); + + // just some no-ops: + recovery.recover(c2Id); + recovery.recover(Iterators.emptyIterator(), c2Id); + assertJournalEntries(ds1, "{}", change1); // unchanged + assertJournalEntries(ds2, "{}", change2, change2b); + + } else { + + // do some concurrency testing as well to check if + final int NUM_THREADS = 200; + final CountDownLatch ready = new CountDownLatch(NUM_THREADS); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch end = new CountDownLatch(NUM_THREADS); + final List exceptions = synchronizedList(new ArrayList()); + for (int i = 0; i < NUM_THREADS; i++) { + Thread th = new Thread(new Runnable() { + + @Override + public void run() { + try { + ready.countDown(); + start.await(); + recovery.recover(Iterators.forArray(x1,z1), c2Id); + } catch (Exception e) { + exceptions.add(e); + } finally { + end.countDown(); + } + } + + }); + th.start(); + } + ready.await(5, TimeUnit.SECONDS); + start.countDown(); + assertTrue(end.await(20, TimeUnit.SECONDS)); + assertJournalEntries(ds1, "{}", change1); // unchanged + assertJournalEntries(ds2, "{}", change2, change2b); + for (Exception ex : exceptions) { + throw ex; + } + } + } + + private DocumentMK createMK(int clusterId, int asyncDelay) { + if (ds == null) { + ds = new MemoryDocumentStore(); + } + if (bs == null) { + bs = new MemoryBlobStore(); + } + return createMK(clusterId, asyncDelay, ds, bs); + } +} diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java index fb03c67..1ca49fa 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModificationsTest.java @@ -35,6 +35,7 @@ import org.apache.jackrabbit.oak.plugins.document.util.MapFactory; import org.junit.Ignore; import org.junit.Test; +import static org.apache.jackrabbit.oak.plugins.document.UnsavedModifications.Snapshot.IGNORE; import static org.junit.Assert.assertEquals; public class UnsavedModificationsTest { @@ -96,7 +97,7 @@ public class UnsavedModificationsTest { public void run() { while (exceptions.isEmpty()) { try { - mod.persist(ns, new ReentrantLock()); + mod.persist(ns, IGNORE, new ReentrantLock()); Thread.sleep(10); } catch (Exception e) { exceptions.add(e); @@ -169,7 +170,7 @@ public class UnsavedModificationsTest { paths.clear(); } if (random.nextFloat() < 0.00005) { - pending.persist(ns, new ReentrantLock()); + pending.persist(ns, IGNORE, new ReentrantLock()); } } } @@ -220,7 +221,7 @@ public class UnsavedModificationsTest { } // drain pending, this will force it back to in-memory - pending.persist(ns, new ReentrantLock()); + pending.persist(ns, IGNORE, new ReentrantLock()); // loop over remaining paths while (paths.hasNext()) { @@ -257,7 +258,7 @@ public class UnsavedModificationsTest { } // drain pending, this will force it back to in-memory - pending.persist(ns, new ReentrantLock()); + pending.persist(ns, IGNORE, new ReentrantLock()); // loop over remaining paths while (paths.hasNext()) { diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java new file mode 100644 index 0000000..15be0d9 --- /dev/null +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/JournalIT.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.mongo; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import com.mongodb.DB; + +import org.apache.jackrabbit.oak.cache.CacheStats; +import org.apache.jackrabbit.oak.plugins.document.AbstractJournalTest; +import org.apache.jackrabbit.oak.plugins.document.DocumentMK; +import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; +import org.apache.jackrabbit.oak.plugins.document.DocumentStore; +import org.apache.jackrabbit.oak.plugins.document.JournalGarbageCollector; +import org.apache.jackrabbit.oak.plugins.document.MongoUtils; +import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection; +import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore; +import org.apache.jackrabbit.oak.stats.Clock; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; + +public class JournalIT extends AbstractJournalTest { + + private static final Logger LOG = LoggerFactory.getLogger(JournalIT.class); + + @BeforeClass + public static void checkMongoDbAvailable() { + Assume.assumeNotNull(MongoUtils.getConnection()); + } + + @Before + @After + public void dropCollections() throws Exception { + MongoConnection mongoConnection = MongoUtils.getConnection(); + MongoUtils.dropCollections(mongoConnection.getDB()); + mongoConnection.close(); + } + + @Test + public void cacheInvalidationTest() throws Exception { + final DocumentNodeStore ns1 = createMK(1, 0).getNodeStore(); + final DocumentNodeStore ns2 = createMK(2, 0).getNodeStore(); + LOG.info("cache size 1: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount())); + + // invalidate both caches under test first + invalidateDocChildrenCache(ns1); + ns1.getDocumentStore().invalidateCache(); + + { + DocumentStore s = ns1.getDocumentStore(); + CacheStats cacheStats = s.getCacheStats(); + LOG.info("m.size="+(cacheStats==null ? "null" : cacheStats.getElementCount())); + } + LOG.info("cache size 2: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount())); + + // first create child node in instance 1 + final List paths = createRandomPaths(1, 5000000, 1000); + int i=0; + for(String path : paths) { + if (i++%100==0) { + LOG.info("at "+i); + } + getOrCreate(ns1, path, false); + } + final List paths2 = createRandomPaths(20, 2345, 100); + getOrCreate(ns1, paths2, false); + ns1.runBackgroundOperations(); + for(String path : paths) { + assertDocCache(ns1, true, path); + } + + { + DocumentStore s = ns1.getDocumentStore(); + CacheStats cacheStats = s.getCacheStats(); + LOG.info("m.size="+(cacheStats==null ? "null" : cacheStats.getElementCount())); + } + + LOG.info("cache size 2: "+(ns1.getDocumentStore().getCacheStats()==null ? "null" : ns1.getDocumentStore().getCacheStats().getElementCount())); + long time = System.currentTimeMillis(); + for(int j=0; j<100; j++) { + long now = System.currentTimeMillis(); + LOG.info("loop "+j+", "+(now-time)+"ms"); + time = now; + final Set electedPaths = choose(paths2, random.nextInt(30)); + { + // choose a random few from above created paths and modify them + final long t1 = System.currentTimeMillis(); + ns2.runBackgroundOperations(); // make sure ns2 has the latest from ns1 + final long t2 = System.currentTimeMillis(); + LOG.info("ns2 background took "+(t2-t1)+"ms"); + + for(String electedPath : electedPaths) { + // modify /child in another instance 2 + setProperty(ns2, electedPath, "p", "ns2"+System.currentTimeMillis(), false); + } + final long t3 = System.currentTimeMillis(); + LOG.info("setting props "+(t3-t2)+"ms"); + + ns2.runBackgroundOperations(); + final long t4 = System.currentTimeMillis(); + LOG.info("ns2 background took2 "+(t4-t3)+"ms"); + } + + // that should not have changed the fact that we have it cached in 'ns1' + for(String electedPath : electedPaths) { + assertDocCache(ns1, true, electedPath); + } + + // doing a backgroundOp now should trigger invalidation + // which thx to the external modification will remove the entry from the cache: + ns1.runBackgroundOperations(); + for(String electedPath : electedPaths) { + assertDocCache(ns1, false, electedPath); + } + + // when I access it again with 'ns1', then it gets cached again: + for(String electedPath : electedPaths) { + getOrCreate(ns1, electedPath, false); + assertDocCache(ns1, true, electedPath); + } + } + } + + @Test + public void largeCleanupTest() throws Exception { + // create more than DELETE_BATCH_SIZE of entries and clean them up + // should make sure to loop in JournalGarbageCollector.gc such + // that it would find issue described here: + // https://issues.apache.org/jira/browse/OAK-2829?focusedCommentId=14585733&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14585733 + + doLargeCleanupTest(0, 100); + doLargeCleanupTest(200, 1000);// using offset as to not make sure to always create new entries + doLargeCleanupTest(2000, 10000); + doLargeCleanupTest(20000, 30000); // using 'size' much larger than 30k will be tremendously slow due to ordered node + } + + @Test + public void simpleCacheInvalidationTest() throws Exception { + final DocumentNodeStore ns1 = createMK(1, 0).getNodeStore(); + final DocumentNodeStore ns2 = createMK(2, 0).getNodeStore(); + + // invalidate both caches under test first + invalidateDocChildrenCache(ns1); + ns1.getDocumentStore().invalidateCache(); + + // first create child node in instance 1 + getOrCreate(ns1, "/child", true); + assertDocCache(ns1, true, "/child"); + + { + // modify /child in another instance 2 + ns2.runBackgroundOperations(); // read latest changes from ns1 + setProperty(ns2, "/child", "p", "ns2"+System.currentTimeMillis(), true); + } + // that should not have changed the fact that we have it cached in 'ns' + assertDocCache(ns1, true, "/child"); + + // doing a backgroundOp now should trigger invalidation + // which thx to the external modification will remove the entry from the cache: + ns1.runBackgroundOperations(); + assertDocCache(ns1, false, "/child"); + + // when I access it again with 'ns', then it gets cached again: + getOrCreate(ns1, "/child", false); + assertDocCache(ns1, true, "/child"); + } + + private void doLargeCleanupTest(int offset, int size) throws Exception { + Clock clock = new Clock.Virtual(); + DocumentMK mk1 = createMK(0 /* clusterId: 0 => uses clusterNodes collection */, 0, + new MemoryDocumentStore(), new MemoryBlobStore()); + DocumentNodeStore ns1 = mk1.getNodeStore(); + // make sure we're visible and marked as active + renewClusterIdLease(ns1); + JournalGarbageCollector gc = new JournalGarbageCollector(ns1); + clock.getTimeIncreasing(); + clock.getTimeIncreasing(); + gc.gc(0, TimeUnit.MILLISECONDS); // cleanup everything that might still be there + + // create entries as parametrized: + for(int i=offset; i fixtures) { @@ -58,9 +75,21 @@ public class ObservationTest extends Benchmark { if (fixture.isAvailable(1)) { System.out.format("%s: Observation throughput benchmark%n", fixture); try { - Repository[] cluster = fixture.setUpCluster(1); + final AtomicReference whiteboardRef = new AtomicReference(); + Repository[] cluster; + if (fixture instanceof OakRepositoryFixture) { + cluster = ((OakRepositoryFixture) fixture).setUpCluster(1, new JcrCreator() { + @Override + public Jcr customize(Oak oak) { + whiteboardRef.set(oak.getWhiteboard()); + return new Jcr(oak); + } + }); + } else { + cluster = fixture.setUpCluster(1); + } try { - run(cluster[0]); + run(cluster[0], whiteboardRef.get()); } finally { fixture.tearDownCluster(); } @@ -71,87 +100,159 @@ public class ObservationTest extends Benchmark { } } - private void run(Repository repository) throws RepositoryException, ExecutionException, InterruptedException { + private void run(Repository repository, @Nullable Whiteboard whiteboard) + throws RepositoryException, ExecutionException, InterruptedException { Session session = createSession(repository); long t0 = System.currentTimeMillis(); try { - observationThroughput(repository); + observationThroughput(repository, whiteboard); } finally { System.out.println("Time elapsed: " + (System.currentTimeMillis() - t0) + " ms"); session.logout(); } } - public void observationThroughput(final Repository repository) + public void observationThroughput(final Repository repository, + @Nullable Whiteboard whiteboard) throws RepositoryException, InterruptedException, ExecutionException { long t = 0; final AtomicInteger eventCount = new AtomicInteger(); final AtomicInteger nodeCount = new AtomicInteger(); - Session[] sessions = new Session[LISTENER_COUNT]; - EventListener[] listeners = new Listener[LISTENER_COUNT]; + List sessions = Lists.newArrayList(); + List listeners = Lists.newArrayList(); + + List testPaths = Lists.newArrayList(); + Session s = createSession(repository); + String path = "/path/to/observation/benchmark-" + AbstractTest.TEST_ID; + try { + Node testRoot = JcrUtils.getOrCreateByPath(path, null, s); + for (int i = 0; i < WRITER_COUNT; i++) { + testPaths.add(testRoot.addNode("session-" + i).getPath()); + } + s.save(); + } finally { + s.logout(); + } + String pathFilter = PATH_FILTER == null ? path : PATH_FILTER; + System.out.println("Path filter for event listener: " + pathFilter); + ExecutorService service = Executors.newFixedThreadPool(WRITER_COUNT); try { for (int k = 0; k < LISTENER_COUNT; k++) { - sessions[k] = createSession(repository); - listeners[k] = new Listener(eventCount); - ObservationManager obsMgr = sessions[k].getWorkspace().getObservationManager(); - obsMgr.addEventListener(listeners[k], EVENT_TYPES, "/", true, null, null, false); + sessions.add(createSession(repository)); + listeners.add(new Listener(eventCount)); + ObservationManager obsMgr = sessions.get(k).getWorkspace().getObservationManager(); + obsMgr.addEventListener(listeners.get(k), EVENT_TYPES, pathFilter, true, null, null, false); } + // also add a listener on the root node + addRootListener(repository, sessions, listeners); - Future createNodes = Executors.newSingleThreadExecutor().submit(new Runnable() { - private final Session session = repository.login(new SimpleCredentials("admin", "admin".toCharArray())); + List> createNodes = Lists.newArrayList(); + for (final String p : testPaths) { + createNodes.add(service.submit(new Callable() { + private final Session session = createSession(repository); + private int numNodes = 0; - @Override - public void run() { - try { - Node testRoot = session.getRootNode().addNode("observationBenchmark"); - createChildren(testRoot, 100); - for (Node m : JcrUtils.getChildNodes(testRoot)) { - createChildren(m, 100); - for (Node n : JcrUtils.getChildNodes(m)) { - createChildren(n, 5); + @Override + public Object call() throws Exception { + try { + Node testRoot = session.getNode(p); + createChildren(testRoot, 100); + for (Node m : JcrUtils.getChildNodes(testRoot)) { + createChildren(m, 100 / WRITER_COUNT); + for (Node n : JcrUtils.getChildNodes(m)) { + createChildren(n, 5); + } } + session.save(); + } finally { + session.logout(); } - session.save(); - } catch (RepositoryException e) { - throw new RuntimeException(e); - } finally { - session.logout(); + return null; } - } - private void createChildren(Node node, int count) throws RepositoryException { - for (int c = 0; c < count; c++) { - node.addNode("n" + c); - if (nodeCount.incrementAndGet() % SAVE_INTERVAL == 0) { - node.getSession().save(); + private void createChildren(Node node, int count) + throws RepositoryException { + for (int c = 0; c < count; c++) { + node.addNode("n" + c); + nodeCount.incrementAndGet(); + if (++numNodes % SAVE_INTERVAL == 0) { + node.getSession().save(); + } } } - } - }); + })); + } - System.out.println("ms #node nodes/s #event event/s event ratio"); - while (!createNodes.isDone() || (eventCount.get() < nodeCount.get() * EVENTS_PER_NODE)) { + System.out.println("ms #node nodes/s #event event/s event-ratio queue external"); + while (!isDone(createNodes) || (eventCount.get() / LISTENER_COUNT < nodeCount.get() * EVENTS_PER_NODE)) { long t0 = System.currentTimeMillis(); Thread.sleep(OUTPUT_RESOLUTION); t += System.currentTimeMillis() - t0; int nc = nodeCount.get(); int ec = eventCount.get() / LISTENER_COUNT; + int[] ql = getObservationQueueLength(whiteboard); double nps = (double) nc / t * 1000; double eps = (double) ec / t * 1000; double epn = (double) ec / nc / EVENTS_PER_NODE; - System.out.format("%7d %7d %7.1f %7d %7.1f %1.2f%n", t, nc, nps, ec, eps, epn); + System.out.format( + "%7d %7d %7.1f %7d %7.1f %7.2f %7d %7d%n", + t, nc, nps, ec, eps, epn, ql[0], ql[1]); } - createNodes.get(); + get(createNodes); } finally { - for (int k = 0; k < LISTENER_COUNT; k++) { - sessions[k].getWorkspace().getObservationManager().removeEventListener(listeners[k]); - sessions[k].logout(); + for (int k = 0; k < sessions.size(); k++) { + sessions.get(k).getWorkspace().getObservationManager() + .removeEventListener(listeners.get(k)); + sessions.get(k).logout(); } + service.shutdown(); + service.awaitTermination(1, TimeUnit.MINUTES); + } + } + + private void addRootListener(Repository repository, + List sessions, + List listeners) + throws RepositoryException { + Session s = createSession(repository); + sessions.add(s); + Listener listener = new Listener(new AtomicInteger()); + ObservationManager obsMgr = s.getWorkspace().getObservationManager(); + obsMgr.addEventListener(listener, EVENT_TYPES, "/", true, null, null, false); + listeners.add(listener); + } + + private static int[] getObservationQueueLength(@Nullable Whiteboard wb) { + if (wb == null) { + return new int[]{-1, -1}; + } + int len = -1; + int ext = -1; + for (BackgroundObserverMBean bean : getServices(wb, BackgroundObserverMBean.class)) { + len = Math.max(bean.getQueueSize(), len); + ext = Math.max(bean.getExternalEventCount(), ext); + } + return new int[]{len, ext}; + } + + private static boolean isDone(Iterable> futures) { + for (Future f : futures) { + if (!f.isDone()) { + return false; + } + } + return true; + } + + private static void get(Iterable> futures) + throws ExecutionException, InterruptedException { + for (Future f : futures) { + f.get(); } } diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java new file mode 100644 index 0000000..e719bfe --- /dev/null +++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/JcrCreator.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.fixture; + +import org.apache.jackrabbit.oak.Oak; +import org.apache.jackrabbit.oak.jcr.Jcr; + +public interface JcrCreator { + JcrCreator DEFAULT = new JcrCreator() { + @Override + public Jcr customize(Oak oak) { + return new Jcr(oak); + } + }; + + public Jcr customize(Oak oak); +} \ No newline at end of file diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java index 36453c1..36979a0 100644 --- a/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java +++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java @@ -17,6 +17,7 @@ package org.apache.jackrabbit.oak.fixture; import java.io.File; + import javax.jcr.Repository; import org.apache.jackrabbit.api.JackrabbitRepository; @@ -90,10 +91,14 @@ public class OakRepositoryFixture implements RepositoryFixture { @Override public final Repository[] setUpCluster(int n) throws Exception { + return setUpCluster(n, JcrCreator.DEFAULT); + } + + public Repository[] setUpCluster(int n, JcrCreator customizer) throws Exception { Oak[] oaks = oakFixture.setUpCluster(n); cluster = new Repository[oaks.length]; for (int i = 0; i < oaks.length; i++) { - cluster[i] = new Jcr(oaks[i]).createRepository();; + cluster[i] = customizer.customize(oaks[i]).createRepository(); } return cluster; } -- 1.8.4.2