From 346c8df89f56076d5165d977e2332d1db543ab68 Mon Sep 17 00:00:00 2001
From: Vikas Saurabh <vsaurabh@adobe.com>
Date: Thu, 23 Apr 2015 18:59:35 +0530
Subject: [PATCH] OAK-2793: Time limit for HierarchicalInvalidator

---
 .../plugins/document/mongo/CacheInvalidator.java   | 95 ++++++++++++++--------
 .../plugins/document/mongo/MongoDocumentStore.java | 20 ++++-
 .../document/mongo/CacheInvalidationIT.java        | 34 ++++++++
 3 files changed, 111 insertions(+), 38 deletions(-)

diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java
index d4b6a56..90a8dda 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidator.java
@@ -73,6 +73,7 @@ abstract class CacheInvalidator {
         long timeTaken;
         int queryCount;
         int cacheEntriesProcessedCount;
+        boolean invalidationTimedOut;
 
         @Override
         public String toString() {
@@ -83,6 +84,7 @@ abstract class CacheInvalidator {
                     ", timeTaken=" + timeTaken +
                     ", queryCount=" + queryCount +
                     ", cacheEntriesProcessedCount=" + cacheEntriesProcessedCount +
+                    ", invalidationTimedOut=" + invalidationTimedOut +
                     '}';
         }
 
@@ -176,10 +178,12 @@ abstract class CacheInvalidator {
 
         private final DBCollection nodes;
         private final MongoDocumentStore documentStore;
+        private final long cacheInvalidationTimeout;
 
         public HierarchicalInvalidator(MongoDocumentStore documentStore) {
             this.documentStore = documentStore;
             this.nodes = documentStore.getDBCollection(Collection.NODES);
+            this.cacheInvalidationTimeout = documentStore.getCacheInvalidationTimeout();
         }
 
         @Override
@@ -199,6 +203,8 @@ abstract class CacheInvalidator {
             final BasicDBObject keys = new BasicDBObject(Document.ID, 1);
             keys.put(Document.MOD_COUNT, 1);
 
+            boolean cacheInvalidationTimedOut = false;
+
             while (pitr.hasNext()) {
                 final TreeNode tn = pitr.next();
 
@@ -223,48 +229,54 @@ abstract class CacheInvalidator {
                 if (!sameLevelNodes.isEmpty() &&
                         ((hasMore && tn.level() != pitr.peek().level()) || !hasMore)) {
                     List<String> sameLevelNodeIds = new ArrayList<String>(sameLevelNodes.keySet());
-                    for(List<String> idBatch : Lists.partition(sameLevelNodeIds, IN_QUERY_BATCH_SIZE)) {
-
-                        QueryBuilder query = QueryBuilder.start(Document.ID)
-                                .in(idBatch);
-
-                        // Fetch lastRev and modCount for each such nodes
-                        DBCursor cursor = nodes.find(query.get(), keys);
-                        cursor.setReadPreference(ReadPreference.primary());
-                        LOG.debug(
-                                "Checking for changed nodes at level {} with {} paths",
-                                tn.level(), sameLevelNodes.size());
-                        result.queryCount++;
-                        try {
-                            for (DBObject obj : cursor) {
-
-                                result.cacheEntriesProcessedCount++;
-
-                                Number latestModCount = (Number) obj.get(Document.MOD_COUNT);
-                                String id = (String) obj.get(Document.ID);
-
-                                final TreeNode tn2 = sameLevelNodes.get(id);
-                                CachedNodeDocument cachedDoc = tn2.getDocument();
-                                if (cachedDoc != null) {
-                                    boolean noChangeInModCount = Objects.equal(latestModCount, cachedDoc.getModCount());
-                                    if (noChangeInModCount) {
-                                        result.upToDateCount++;
-                                        tn2.markUptodate(startTime);
-                                    } else {
-                                        result.invalidationCount++;
-                                        tn2.invalidate();
+                    if (!cacheInvalidationTimedOut) {
+                        for (List<String> idBatch : Lists.partition(sameLevelNodeIds, IN_QUERY_BATCH_SIZE)) {
+                            if (cacheInvalidationTimedOut) break;
+
+                            QueryBuilder query = QueryBuilder.start(Document.ID)
+                                    .in(idBatch);
+
+                            // Fetch lastRev and modCount for each such nodes
+                            DBCursor cursor = nodes.find(query.get(), keys);
+                            cursor.setReadPreference(ReadPreference.primary());
+                            LOG.debug(
+                                    "Checking for changed nodes at level {} with {} paths",
+                                    tn.level(), sameLevelNodes.size());
+                            result.queryCount++;
+                            try {
+                                for (DBObject obj : cursor) {
+                                    if (cacheInvalidationTimedOut) break;
+
+                                    result.cacheEntriesProcessedCount++;
+
+                                    Number latestModCount = (Number) obj.get(Document.MOD_COUNT);
+                                    String id = (String) obj.get(Document.ID);
+
+                                    final TreeNode tn2 = sameLevelNodes.get(id);
+                                    CachedNodeDocument cachedDoc = tn2.getDocument();
+                                    if (cachedDoc != null) {
+                                        boolean noChangeInModCount = Objects.equal(latestModCount, cachedDoc.getModCount());
+                                        if (noChangeInModCount) {
+                                            result.upToDateCount++;
+                                            tn2.markUptodate(startTime);
+                                        } else {
+                                            result.invalidationCount++;
+                                            tn2.invalidate();
+                                        }
                                     }
-                                }
 
-                                // Remove the processed nodes
-                                sameLevelNodes.remove(tn2.getId());
+                                    // Remove the processed nodes
+                                    sameLevelNodes.remove(tn2.getId());
+
+                                    cacheInvalidationTimedOut = cacheInvalidationTimedOut || !isTimeBound(startTime);
+                                }
+                            } finally {
+                                cursor.close();
                             }
-                        } finally {
-                            cursor.close();
                         }
                     }
 
-                    // NodeDocument present in cache but not in database
+                    // NodeDocument present in cache but not in database OR if invalidation exceeded timeout
                     // Remove such nodes from cache
                     if (!sameLevelNodes.isEmpty()) {
                         for (TreeNode leftOverNodes : sameLevelNodes.values()) {
@@ -278,6 +290,10 @@ abstract class CacheInvalidator {
 
             result.timeTaken = System.currentTimeMillis() - startTime;
             LOG.debug("Cache invalidation details - {}", result);
+            if (cacheInvalidationTimedOut) {
+                LOG.warn("Cache invalidation timed out - {}", result);
+                result.invalidationTimedOut = true;
+            }
 
             // TODO collect the list of ids which are invalidated such that entries for only those
             // ids are removed from the Document Children Cache
@@ -319,6 +335,13 @@ abstract class CacheInvalidator {
             return root;
         }
 
+        private boolean isTimeBound(long startTime) {
+            if (cacheInvalidationTimeout == -1) return true;
+
+            long timeTaken = System.currentTimeMillis() - startTime;
+            return timeTaken < cacheInvalidationTimeout;
+        }
+
         private class TreeNode {
             private final String name;
             private final TreeNode parent;
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
index 44d10bc..51070de1 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
@@ -173,6 +173,13 @@ public class MongoDocumentStore implements DocumentStore {
     private final long maxQueryTimeMS =
             Long.getLong("oak.mongo.maxQueryTimeMS", TimeUnit.MINUTES.toMillis(1));
 
+    /**
+     * Time out value (in ms) to have an upper bound on amount of time spent while doing
+     * cache invalidation. Default value is -1 which disables any timeout. See OAK-2793.
+     */
+    private long cacheInvalidationTimeout =
+            Long.getLong("oak.mongo.cacheInvalidationTimeout", -1);
+
     private String lastReadWriteMode;
 
     private final Map<String, String> metadata;
@@ -235,8 +242,8 @@ public class MongoDocumentStore implements DocumentStore {
         cacheStats = new CacheStats(nodesCache, "Document-Documents", builder.getWeigher(),
                 builder.getDocumentCacheSize());
         LOG.info("Configuration maxReplicationLagMillis {}, " +
-                "maxDeltaForModTimeIdxSecs {}, disableIndexHint {}",
-                maxReplicationLagMillis, maxDeltaForModTimeIdxSecs, disableIndexHint);
+                "maxDeltaForModTimeIdxSecs {}, disableIndexHint {}, cacheInvalidationTimeout {}",
+                maxReplicationLagMillis, maxDeltaForModTimeIdxSecs, disableIndexHint, cacheInvalidationTimeout);
     }
 
     private static String checkVersion(DB db) {
@@ -1010,6 +1017,15 @@ public class MongoDocumentStore implements DocumentStore {
         return disableIndexHint;
     }
 
+    long getCacheInvalidationTimeout() {
+        return cacheInvalidationTimeout;
+    }
+
+    void setCacheInvalidationTimeout(long cacheInvalidationTimeout) {
+        this.cacheInvalidationTimeout = cacheInvalidationTimeout;
+        LOG.info("Setting cache invalidation timeout to {}", cacheInvalidationTimeout);
+    }
+
     Iterable<? extends Map.Entry<CacheValue, ? extends CachedNodeDocument>> getCacheEntries() {
         if (nodesCache instanceof OffHeapCache) {
             return Iterables.concat(nodesCache.asMap().entrySet(),
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidationIT.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidationIT.java
index cf0e107..02b874e 100644
--- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidationIT.java
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/CacheInvalidationIT.java
@@ -140,6 +140,40 @@ public class CacheInvalidationIT extends AbstractMongoConnectionTest {
         assertEquals(4, result.cacheEntriesProcessedCount);
     }
 
+    //OAK-2793
+    @Test
+    public void testCacheTimeoutInvalidatesAllEntries()
+        throws CommitFailedException {
+        createScenario();
+
+        NodeBuilder b2 = getRoot(c2).builder();
+        builder(b2, "/a/c").setProperty("foo", "bar");
+        c2.merge(b2, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        //Push pending changes at /a
+        c2.runBackgroundOperations();
+
+        //Refresh the head for c1
+        refreshHead(c1);
+
+        MongoDocumentStore documentStore = ds(c1);
+        documentStore.setCacheInvalidationTimeout(0);
+
+        InvalidationResult result = CacheInvalidator.createHierarchicalInvalidator(documentStore).invalidateCache();
+
+        //We should hit timeout
+        assertTrue(result.invalidationTimedOut);
+
+        //Only 1 entry would be invalidated
+        assertEquals(1, result.invalidationCount);
+
+        //1 query would be fired
+        assertEquals(1, result.queryCount);
+
+        //Current c1's cache should have just 1 entry for '/'
+        assertEquals(1, getCurrentCacheSize(c1));
+    }
+
     @Test
     public void testCacheInvalidationHierarchicalNotExist()
             throws CommitFailedException {
-- 
2.1.4

