Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1774040) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -186,6 +186,12 @@ Boolean.getBoolean(SYS_PROP_DISABLE_JOURNAL); /** + * Threshold for number of paths in journal entry to require a force push during commit + * (instead of at background write) + */ + private int journalPushThreshold = Integer.getInteger("oak.journalPushThreshold", 100000); + + /** * The document store (might be used by multiple node stores). */ protected final DocumentStore store; @@ -789,7 +795,13 @@ changes.modified(c.getModifiedPaths()); changes.addChangeSet(getChangeSet(info)); // update head revision - newHead[0] = before.update(c.getRevision()); + Revision r = c.getRevision(); + newHead[0] = before.update(r); + if (changes.getNumChangedPaths() >= journalPushThreshold) { + LOG.info("Pushing journal entry at {} as number of changes ({}) have reached {}", + r, changes.getNumChangedPaths(), journalPushThreshold); + pushJournalEntry(r); + } setRoot(newHead[0]); commitQueue.headRevisionChanged(); dispatcher.contentChanged(getRoot(), info); @@ -861,6 +873,14 @@ return enableConcurrentAddRemove; } + int getJournalPushThreshold() { + return journalPushThreshold; + } + + void setJournalPushThreshold(int journalPushThreshold) { + this.journalPushThreshold = journalPushThreshold; + } + @Nonnull public ClusterNodeInfo getClusterInfo() { return clusterNodeInfo; @@ -2142,14 +2162,7 @@ return unsavedLastRevisions.persist(this, new UnsavedModifications.Snapshot() { @Override public void acquiring(Revision mostRecent) { - if (store.create(JOURNAL, singletonList(changes.asUpdateOp(mostRecent)))) { - // success: start with a new document - changes = newJournalEntry(); - } else { - // fail: log and keep the changes - LOG.error("Failed to write to journal, accumulating changes for future write (~" + changes.getMemory() - + " bytes)."); - } + pushJournalEntry(mostRecent); } }, backgroundOperationLock.writeLock()); } @@ -2156,6 +2169,19 @@ //-----------------------------< internal >--------------------------------- + void pushJournalEntry(Revision r) { + if (!changes.hasChanges()) { + LOG.debug("Not pushing journal as there are no changes"); + } else if (store.create(JOURNAL, singletonList(changes.asUpdateOp(r)))) { + // success: start with a new document + changes = newJournalEntry(); + } else { + // fail: log and keep the changes + LOG.error("Failed to write to journal({}), accumulating changes for future write (~{} bytes, {} paths)", + r, changes.getMemory(), changes.getNumChangedPaths()); + } + } + /** * Returns the binary size of a property value represented as a JSON or * {@code -1} if the property is not of type binary. Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java (revision 1774040) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java (working copy) @@ -86,6 +86,17 @@ private volatile TreeNode changes = null; + /** + * Counts number of paths changed due to {@code modified()} calls. + * Applicable for entries being prepared to be persisted. + */ + private volatile int numChangedPaths = 0; + /** + * Tracks if this entry has branch commits or not + * Applicable for entries being prepared to be persisted. + */ + private boolean hasBranchCommits = false; + private boolean concurrent; JournalEntry(DocumentStore store) { @@ -316,6 +327,9 @@ void modified(String path) { TreeNode node = getChanges(); for (String name : PathUtils.elements(path)) { + if (node.get(name) == null) { + numChangedPaths++; + } node = node.getOrCreate(name); } } @@ -364,6 +378,7 @@ branchCommits += ","; } branchCommits += asId(r.asBranchRevision()); + hasBranchCommits = true; } put(BRANCH_COMMITS, branchCommits); } @@ -455,6 +470,20 @@ }; } + /** + * @return number of modified paths being tracked by this journal entry. + */ + int getNumChangedPaths() { + return numChangedPaths; + } + + /** + * @return if this entry has some changes to be pushed + */ + boolean hasChanges() { + return numChangedPaths > 0 || hasBranchCommits; + } + //-----------------------------< internal >--------------------------------- private static String getChanges(TreeNode node) { Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (revision 1774040) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java (working copy) @@ -65,7 +65,6 @@ import javax.annotation.CheckForNull; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; @@ -2868,6 +2867,25 @@ } } + @Test + public void forceJournalFlush() throws Exception { + DocumentNodeStore ns = builderProvider.newBuilder().setAsyncDelay(0).getNodeStore(); + ns.setJournalPushThreshold(2); + int numChangedPaths; + + NodeBuilder builder = ns.getRoot().builder(); + builder.child("foo"); + merge(ns, builder); + numChangedPaths = ns.getCurrentJournalEntry().getNumChangedPaths(); + assertTrue("Single path change shouldn't flush", numChangedPaths > 0); + + builder = ns.getRoot().builder(); + builder.child("bar"); + merge(ns, builder); + numChangedPaths = ns.getCurrentJournalEntry().getNumChangedPaths(); + assertTrue("Two added paths should have forced flush", numChangedPaths == 0); + } + private static class TestException extends RuntimeException { } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (revision 1774040) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java (working copy) @@ -17,6 +17,7 @@ package org.apache.jackrabbit.oak.plugins.document; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Random; @@ -311,6 +312,50 @@ sort.close(); } + @Test + public void countUpdatedPaths() { + DocumentStore store = new MemoryDocumentStore(); + JournalEntry entry = JOURNAL.newDocument(store); + + assertEquals("Incorrect number of initial paths", 0, entry.getNumChangedPaths()); + assertFalse("Incorrect hasChanges", entry.hasChanges()); + + entry.modified("/foo"); + entry.modified("/bar"); + assertEquals("Incorrect number of paths", 2, entry.getNumChangedPaths()); + assertTrue("Incorrect hasChanges", entry.hasChanges()); + + entry.modified(Arrays.asList("/foo1", "/bar1")); + assertEquals("Incorrect number of paths", 4, entry.getNumChangedPaths()); + assertTrue("Incorrect hasChanges", entry.hasChanges()); + + entry.modified("/foo/bar2"); + assertEquals("Incorrect number of paths", 5, entry.getNumChangedPaths()); + assertTrue("Incorrect hasChanges", entry.hasChanges()); + + entry.modified("/foo3/bar3"); + assertEquals("Incorrect number of paths", 7, entry.getNumChangedPaths()); + assertTrue("Incorrect hasChanges", entry.hasChanges()); + + entry.modified(Arrays.asList("/foo/bar4", "/foo5/bar5")); + assertEquals("Incorrect number of paths", 10, entry.getNumChangedPaths()); + assertTrue("Incorrect hasChanges", entry.hasChanges()); + } + + @Test + public void branchAdditionMarksChanges() { + DocumentStore store = new MemoryDocumentStore(); + JournalEntry entry = JOURNAL.newDocument(store); + + assertFalse("Incorrect hasChanges", entry.hasChanges()); + + entry.branchCommit(Collections.emptyList()); + assertFalse("Incorrect hasChanges", entry.hasChanges()); + + entry.branchCommit(Collections.singleton(Revision.fromString("r123-0-1"))); + assertTrue("Incorrect hasChanges", entry.hasChanges()); + } + private static void addRandomPaths(java.util.Collection paths) throws IOException { paths.add("/"); Random random = new Random(42); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java (revision 1774040) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalGCTest.java (working copy) @@ -102,10 +102,8 @@ c.waitUntil(c.getTime() + TimeUnit.HOURS.toMillis(1)); - // must collect all journal entries. the first created when - // DocumentNodeStore was initialized and the second created - // by the background update - assertEquals(2, jgc.gc(1, TimeUnit.HOURS)); + // must collect the journal entry created by the background update + assertEquals(1, jgc.gc(1, TimeUnit.HOURS)); // current time, but without the increment done by getTime() now = c.getTime() - 1; Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java (revision 1774040) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalTest.java (working copy) @@ -337,11 +337,9 @@ DocumentNodeStore ds2 = mk2.getNodeStore(); final int c2Id = ds2.getClusterId(); - // should have 1 each with just the root changed - assertJournalEntries(ds1, "{}"); - assertJournalEntries(ds2, "{}"); - assertEquals(1, countJournalEntries(ds1, 10)); - assertEquals(1, countJournalEntries(ds2, 10)); + // should have none yet + assertEquals(0, countJournalEntries(ds1, 10)); + assertEquals(0, countJournalEntries(ds2, 10)); //1. Create base structure /x/y NodeBuilder b1 = ds1.getRoot().builder(); @@ -381,11 +379,11 @@ final LastRevRecoveryAgent recovery = new LastRevRecoveryAgent(ds1); - // besides the former root change, now 1 also has + // now 1 also has final String change1 = "{\"x\":{\"y\":{}}}"; - assertJournalEntries(ds1, "{}", change1); + assertJournalEntries(ds1, change1); final String change2 = "{\"x\":{}}"; - assertJournalEntries(ds2, "{}", change2); + assertJournalEntries(ds2, change2); String change2b = "{\"x\":{\"y\":{\"z\":{}}}}"; @@ -400,14 +398,14 @@ assertEquals(head2, getDocument(ds1, "/").getLastRev().get(c2Id)); // now 1 is unchanged, but 2 was recovered now, so has one more: - assertJournalEntries(ds1, "{}", change1); // unchanged - assertJournalEntries(ds2, "{}", change2, change2b); + assertJournalEntries(ds1, change1); // unchanged + assertJournalEntries(ds2, change2, change2b); // just some no-ops: recovery.recover(c2Id); recovery.recover(Iterators.emptyIterator(), c2Id); - assertJournalEntries(ds1, "{}", change1); // unchanged - assertJournalEntries(ds2, "{}", change2, change2b); + assertJournalEntries(ds1, change1); // unchanged + assertJournalEntries(ds2, change2, change2b); } else { @@ -439,8 +437,8 @@ ready.await(5, TimeUnit.SECONDS); start.countDown(); assertTrue(end.await(20, TimeUnit.SECONDS)); - assertJournalEntries(ds1, "{}", change1); // unchanged - assertJournalEntries(ds2, "{}", change2, change2b); + assertJournalEntries(ds1, change1); // unchanged + assertJournalEntries(ds2, change2, change2b); for (Exception ex : exceptions) { throw ex; }