From 905e30ab1f9442475f83e544ae76a1a49eb58d8f Mon Sep 17 00:00:00 2001 From: Vikas Saurabh Date: Thu, 8 Dec 2016 15:11:21 +0530 Subject: [PATCH] OAK-3976: journal should support large(r) entries JournalEntry tracks and provides number of modified paths and if it had any branch commits. If number of paths reaches force-push-threshold (configurable), then journal entry is pushed. --- .../oak/plugins/document/DocumentNodeStore.java | 37 +++++++++++++----- .../oak/plugins/document/JournalEntry.java | 29 ++++++++++++++ .../plugins/document/DocumentNodeStoreTest.java | 24 ++++++++++++ .../oak/plugins/document/JournalEntryTest.java | 45 ++++++++++++++++++++++ 4 files changed, 126 insertions(+), 9 deletions(-) diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java index 04ecfd7..f68be53 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java @@ -185,6 +185,13 @@ public final class DocumentNodeStore private boolean disableJournalDiff = Boolean.getBoolean(SYS_PROP_DISABLE_JOURNAL); + public static final String SYS_PROP_JOURNAL_PUSH_THRESHOLD = "oak.journalPushThreshold"; + /** + * Threshold for number of paths in journal entry to require a force push during commit + * (instead of at background write) + */ + static int journalPushThreshold = 100000; //non-final to allow for tests. + /** * The document store (might be used by multiple node stores). */ @@ -789,7 +796,13 @@ public final class DocumentNodeStore changes.modified(c.getModifiedPaths()); changes.addChangeSet(getChangeSet(info)); // update head revision - newHead[0] = before.update(c.getRevision()); + Revision r = c.getRevision(); + newHead[0] = before.update(r); + if (changes.getNumChangedPaths() >= journalPushThreshold) { + LOG.info("Pushing journal entry at {} as number of changes ({}) have reached {}", + r, changes.getNumChangedPaths(), journalPushThreshold); + pushJournalEntry(r); + } setRoot(newHead[0]); commitQueue.headRevisionChanged(); dispatcher.contentChanged(getRoot(), info); @@ -2142,20 +2155,26 @@ public final class DocumentNodeStore return unsavedLastRevisions.persist(this, new UnsavedModifications.Snapshot() { @Override public void acquiring(Revision mostRecent) { - if (store.create(JOURNAL, singletonList(changes.asUpdateOp(mostRecent)))) { - // success: start with a new document - changes = newJournalEntry(); - } else { - // fail: log and keep the changes - LOG.error("Failed to write to journal, accumulating changes for future write (~" + changes.getMemory() - + " bytes)."); - } + pushJournalEntry(mostRecent); } }, backgroundOperationLock.writeLock()); } //-----------------------------< internal >--------------------------------- + void pushJournalEntry(Revision r) { + if (!changes.hasChanges()) { + LOG.debug("Not pushing journal as there are no changes"); + } else if (store.create(JOURNAL, singletonList(changes.asUpdateOp(r)))) { + // success: start with a new document + changes = newJournalEntry(); + } else { + // fail: log and keep the changes + LOG.error("Failed to write to journal({}), accumulating changes for future write (~{} bytes, {} paths)", + r, changes.getMemory(), changes.getNumChangedPaths()); + } + } + /** * Returns the binary size of a property value represented as a JSON or * {@code -1} if the property is not of type binary. diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java index e3aab49..f3e3e01 100644 --- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java +++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/JournalEntry.java @@ -86,6 +86,17 @@ public final class JournalEntry extends Document { private volatile TreeNode changes = null; + /** + * Counts number of paths changed due to {@code modified()} calls. + * Applicable for entries being prepared to be persisted. + */ + private volatile int numChangedPaths = 0; + /** + * Tracks if this entry has branch commits or not + * Applicable for entries being prepared to be persisted. + */ + private boolean hasBranchCommits = false; + private boolean concurrent; JournalEntry(DocumentStore store) { @@ -316,6 +327,9 @@ public final class JournalEntry extends Document { void modified(String path) { TreeNode node = getChanges(); for (String name : PathUtils.elements(path)) { + if (node.get(name) == null) { + numChangedPaths++; + } node = node.getOrCreate(name); } } @@ -364,6 +378,7 @@ public final class JournalEntry extends Document { branchCommits += ","; } branchCommits += asId(r.asBranchRevision()); + hasBranchCommits = true; } put(BRANCH_COMMITS, branchCommits); } @@ -455,6 +470,20 @@ public final class JournalEntry extends Document { }; } + /** + * @return number of modified paths being tracked by this journal entry. + */ + int getNumChangedPaths() { + return numChangedPaths; + } + + /** + * @return if this entry has some changes to be pushed + */ + boolean hasChanges() { + return numChangedPaths > 0 || hasBranchCommits; + } + //-----------------------------< internal >--------------------------------- private static String getChanges(TreeNode node) { diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java index 48834bb..9af055a 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java @@ -2868,6 +2868,30 @@ public class DocumentNodeStoreTest { } } + @Test + public void forceJournalFlush() throws Exception { + int oldJournalPushThreshold = DocumentNodeStore.journalPushThreshold; + DocumentNodeStore.journalPushThreshold = 2; + try { + DocumentNodeStore ns = builderProvider.newBuilder().setAsyncDelay(0).getNodeStore(); + int numChangedPaths; + + NodeBuilder builder = ns.getRoot().builder(); + builder.child("foo"); + merge(ns, builder); + numChangedPaths = ns.getCurrentJournalEntry().getNumChangedPaths(); + assertTrue("Single paths change shouldn't flush", numChangedPaths > 0); + + builder = ns.getRoot().builder(); + builder.child("bar"); + merge(ns, builder); + numChangedPaths = ns.getCurrentJournalEntry().getNumChangedPaths(); + assertTrue("Two added paths should have forced flush", numChangedPaths == 0); + } finally { + DocumentNodeStore.journalPushThreshold = oldJournalPushThreshold; + } + } + private static class TestException extends RuntimeException { } diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java index 92a8b9e..0381044 100644 --- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java +++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/JournalEntryTest.java @@ -17,6 +17,7 @@ package org.apache.jackrabbit.oak.plugins.document; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Random; @@ -311,6 +312,50 @@ public class JournalEntryTest { sort.close(); } + @Test + public void countUpdatedPaths() { + DocumentStore store = new MemoryDocumentStore(); + JournalEntry entry = JOURNAL.newDocument(store); + + assertEquals("Incorrect number of initial paths", 0, entry.getNumChangedPaths()); + assertFalse("Incorrect hasChanhes", entry.hasChanges()); + + entry.modified("/foo"); + entry.modified("/bar"); + assertEquals("Incorrect number of paths", 2, entry.getNumChangedPaths()); + assertTrue("Incorrect hasChanhes", entry.hasChanges()); + + entry.modified(Arrays.asList("/foo1", "/bar1")); + assertEquals("Incorrect number of paths", 4, entry.getNumChangedPaths()); + assertTrue("Incorrect hasChanhes", entry.hasChanges()); + + entry.modified("/foo/bar2"); + assertEquals("Incorrect number of paths", 5, entry.getNumChangedPaths()); + assertTrue("Incorrect hasChanhes", entry.hasChanges()); + + entry.modified("/foo3/bar3"); + assertEquals("Incorrect number of paths", 7, entry.getNumChangedPaths()); + assertTrue("Incorrect hasChanhes", entry.hasChanges()); + + entry.modified(Arrays.asList("/foo/bar4", "/foo5/bar5")); + assertEquals("Incorrect number of paths", 10, entry.getNumChangedPaths()); + assertTrue("Incorrect hasChanhes", entry.hasChanges()); + } + + @Test + public void branchAdditionMarksChanges() { + DocumentStore store = new MemoryDocumentStore(); + JournalEntry entry = JOURNAL.newDocument(store); + + assertFalse("Incorrect hasChanhes", entry.hasChanges()); + + entry.branchCommit(Collections.EMPTY_LIST); + assertFalse("Incorrect hasChanhes", entry.hasChanges()); + + entry.branchCommit(Arrays.asList(Revision.fromString("r123-0-1"))); + assertTrue("Incorrect hasChanhes", entry.hasChanges()); + } + private static void addRandomPaths(java.util.Collection paths) throws IOException { paths.add("/"); Random random = new Random(42); -- 2.8.3