Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (revision 1583596) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (working copy) @@ -16,6 +16,8 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import static org.apache.jackrabbit.oak.plugins.document.Document.ID; + import java.lang.management.ManagementFactory; import java.net.NetworkInterface; import java.util.ArrayList; @@ -26,11 +28,10 @@ import org.apache.jackrabbit.mk.api.MicroKernelException; import org.apache.jackrabbit.oak.commons.StringUtils; +import org.apache.jackrabbit.oak.stats.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.jackrabbit.oak.plugins.document.Document.ID; - /** * Information about a cluster node. */ @@ -57,9 +58,30 @@ /** * The end of the lease. */ - private static final String LEASE_END_KEY = "leaseEnd"; + protected static final String LEASE_END_KEY = "leaseEnd"; /** + * The state of the cluster. + * On proper shutdown the state should be cleared. + */ + protected static final String STATE = "state"; + + /** + * Flag to indicate whether the _lastRev recovery is in progress. + */ + protected static final String REV_RECOVERY_LOCK = "revLock"; + + /** + * Active State. + */ + private static final String ACTIVE_STATE = "active"; + + /** + * _lastRev recovery in progress + */ + protected static final String REV_RECOVERY_ON = "true"; + + /** * Additional info, such as the process id, for support. */ private static final String INFO_KEY = "info"; @@ -85,6 +107,11 @@ private static final String WORKING_DIR = System.getProperty("user.dir", ""); /** + * Only Used For Testing + */ + private static Clock clock; + + /** * The number of milliseconds for a lease (1 minute by default, and * initially). */ @@ -130,9 +157,19 @@ */ private String readWriteMode; + /** + * The state of the cluter node. + */ + private String state; + + /** + * The revLock value of the cluster; + */ + private String revRecoveryLock; + ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId) { this.id = id; - this.startTime = System.currentTimeMillis(); + this.startTime = (clock == null ? System.currentTimeMillis() : clock.getTime()); this.leaseEndTime = startTime; this.store = store; this.machineId = machineId; @@ -139,13 +176,34 @@ this.instanceId = instanceId; } + ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId, String state, + String revRecoveryLock) { + this.id = id; + this.startTime = (clock == null ? System.currentTimeMillis() : clock.getTime()); + this.leaseEndTime = startTime; + this.store = store; + this.machineId = machineId; + this.instanceId = instanceId; + this.state = state; + this.revRecoveryLock = revRecoveryLock; + } + public int getId() { return id; } /** + * Only Used For Testing + * + * @return + */ + protected static void setClock(Clock c) { + clock = c; + } + + /** * Create a cluster node info instance for the store, with the - * + * * @param store the document store (for the lease) * @return the cluster node info */ @@ -155,13 +213,14 @@ /** * Create a cluster node info instance for the store. - * + * * @param store the document store (for the lease) * @param machineId the machine id (null for MAC address) * @param instanceId the instance id (null for current working directory) * @return the cluster node info */ - public static ClusterNodeInfo getInstance(DocumentStore store, String machineId, String instanceId) { + public static ClusterNodeInfo getInstance(DocumentStore store, String machineId, + String instanceId) { if (machineId == null) { machineId = MACHINE_ID; } @@ -174,9 +233,14 @@ update.set(ID, String.valueOf(clusterNode.id)); update.set(MACHINE_ID_KEY, clusterNode.machineId); update.set(INSTANCE_ID_KEY, clusterNode.instanceId); - update.set(LEASE_END_KEY, System.currentTimeMillis() + clusterNode.leaseTime); + update.set(LEASE_END_KEY, + (clock == null ? System.currentTimeMillis() : clock.getTime()) + + clusterNode.leaseTime); update.set(INFO_KEY, clusterNode.toString()); - boolean success = store.create(Collection.CLUSTER_NODES, Collections.singletonList(update)); + update.set(STATE, clusterNode.state); + update.set(REV_RECOVERY_LOCK, clusterNode.revRecoveryLock); + boolean success = + store.create(Collection.CLUSTER_NODES, Collections.singletonList(update)); if (success) { return clusterNode; } @@ -184,13 +248,15 @@ throw new MicroKernelException("Could not get cluster node info"); } - private static ClusterNodeInfo createInstance(DocumentStore store, String machineId, String instanceId) { - long now = System.currentTimeMillis(); + private static ClusterNodeInfo createInstance(DocumentStore store, String machineId, + String instanceId) { + long now = (clock == null ? System.currentTimeMillis() : clock.getTime()); // keys between "0" and "a" includes all possible numbers List list = store.query(Collection.CLUSTER_NODES, "0", "a", Integer.MAX_VALUE); int clusterNodeId = 0; int maxId = 0; + String state = null; for (Document doc : list) { String key = doc.getId(); int id; @@ -222,22 +288,23 @@ if (clusterNodeId == 0 || id < clusterNodeId) { // if there are multiple, use the smallest value clusterNodeId = id; + state = (String) doc.get(STATE); } } if (clusterNodeId == 0) { clusterNodeId = maxId + 1; } - return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId); + return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state, null); } /** * Renew the cluster id lease. This method needs to be called once in a while, * to ensure the same cluster id is not re-used by a different instance. - * + * * @param nextCheckMillis the millisecond offset */ public void renewLease(long nextCheckMillis) { - long now = System.currentTimeMillis(); + long now = (clock == null ? System.currentTimeMillis() : clock.getTime()); if (now + nextCheckMillis + nextCheckMillis < leaseEndTime) { return; } @@ -244,6 +311,7 @@ UpdateOp update = new UpdateOp("" + id, true); leaseEndTime = now + leaseTime; update.set(LEASE_END_KEY, leaseEndTime); + update.set(STATE, ACTIVE_STATE); ClusterNodeInfoDocument doc = store.createOrUpdate(Collection.CLUSTER_NODES, update); String mode = (String) doc.get(READ_WRITE_MODE_KEY); if (mode != null && !mode.equals(readWriteMode)) { @@ -263,6 +331,8 @@ public void dispose() { UpdateOp update = new UpdateOp("" + id, true); update.set(LEASE_END_KEY, null); + update.set(STATE, null); + update.set(REV_RECOVERY_LOCK, null); store.createOrUpdate(Collection.CLUSTER_NODES, update); } @@ -273,8 +343,10 @@ "machineId: " + machineId + ",\n" + "instanceId: " + instanceId + ",\n" + "pid: " + PROCESS_ID + ",\n" + - "uuid: " + uuid +",\n" + - "readWriteMode: " + readWriteMode; + "uuid: " + uuid + ",\n" + + "readWriteMode: " + readWriteMode + ",\n" + + "state: " + state + ",\n" + + "revLock: " + revRecoveryLock; } private static long getProcessId() { @@ -290,7 +362,7 @@ /** * Calculate the unique machine id. This is the lowest MAC address if * available. As an alternative, a randomly generated UUID is used. - * + * * @return the unique id */ private static String getMachineId() { Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1583596) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -290,6 +290,7 @@ private final VersionGarbageCollector versionGarbageCollector; private final Executor executor; + private final LastRevRecoveryAgent lastRevRecoveryAgent; public DocumentNodeStore(DocumentMK.Builder builder) { this.blobStore = builder.getBlobStore(); @@ -322,6 +323,7 @@ this.branches = new UnmergedBranches(getRevisionComparator()); this.asyncDelay = builder.getAsyncDelay(); this.versionGarbageCollector = new VersionGarbageCollector(this); + this.lastRevRecoveryAgent = new LastRevRecoveryAgent(this); this.missing = new DocumentNodeState(this, "MISSING", new Revision(0, 0, 0)) { @Override public int getMemory() { @@ -378,11 +380,19 @@ new BackgroundOperation(this, isDisposed), "DocumentNodeStore background thread"); backgroundThread.setDaemon(true); + checkLastRevRecovery(); backgroundThread.start(); LOG.info("Initialized DocumentNodeStore with clusterNodeId: {}", clusterId); } + /** + * Recover _lastRev recovery if needed. + */ + private void checkLastRevRecovery() { + lastRevRecoveryAgent.recover(clusterId); + } + public void dispose() { runBackgroundOperations(); if (!isDisposed.getAndSet(true)) { @@ -1707,4 +1717,8 @@ public VersionGarbageCollector getVersionGarbageCollector() { return versionGarbageCollector; } + @Nonnull + public LastRevRecoveryAgent getLastRevRecoveryAgent() { + return lastRevRecoveryAgent; + } } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java (revision 1583596) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java (working copy) @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.jackrabbit.oak.plugins.document; - -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; - -import javax.annotation.CheckForNull; - -import com.google.common.base.Predicate; -import com.google.common.collect.Maps; -import org.apache.jackrabbit.oak.commons.PathUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static com.google.common.collect.ImmutableList.of; -import static com.google.common.collect.Iterables.filter; -import static com.google.common.collect.Iterables.mergeSorted; - -public class LastRevRecovery { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final DocumentNodeStore nodeStore; - - public LastRevRecovery(DocumentNodeStore nodeStore) { - this.nodeStore = nodeStore; - } - - public int recover(Iterator suspects, int clusterId) { - UnsavedModifications unsaved = new UnsavedModifications(); - UnsavedModifications unsavedParents = new UnsavedModifications(); - - //Map of known last rev of checked paths - Map knownLastRevs = Maps.newHashMap(); - - while (suspects.hasNext()) { - NodeDocument doc = suspects.next(); - - Revision currentLastRev = doc.getLastRev().get(clusterId); - if (currentLastRev != null) { - knownLastRevs.put(doc.getPath(), currentLastRev); - } - Revision lostLastRev = determineMissedLastRev(doc, clusterId); - - //1. Update lastRev for this doc - if (lostLastRev != null) { - unsaved.put(doc.getPath(), lostLastRev); - } - - Revision lastRevForParents = lostLastRev != null ? lostLastRev : currentLastRev; - - //If both currentLastRev and lostLastRev are null it means - //that no change is done by suspect cluster on this document - //so nothing needs to be updated. Probably it was only changed by - //other cluster nodes. If this node is parent of any child node which - //has been modified by cluster then that node roll up would - //add this node path to unsaved - - //2. Update lastRev for parent paths aka rollup - if (lastRevForParents != null) { - String path = doc.getPath(); - while (true) { - if (PathUtils.denotesRoot(path)) { - break; - } - path = PathUtils.getParentPath(path); - unsavedParents.put(path, lastRevForParents); - } - } - } - - for(String parentPath : unsavedParents.getPaths()){ - Revision calcLastRev = unsavedParents.get(parentPath); - Revision knownLastRev = knownLastRevs.get(parentPath); - - //Copy the calcLastRev of parent only if they have changed - //In many case it might happen that parent have consistent lastRev - //This check ensures that unnecessary updates are not made - if(knownLastRev == null - || calcLastRev.compareRevisionTime(knownLastRev) > 0){ - unsaved.put(parentPath, calcLastRev); - } - } - - //Note the size before persist as persist operation - //would empty the internal state - int size = unsaved.getPaths().size(); - - if (log.isDebugEnabled()) { - log.debug("Last revision for following documents would be updated {}", unsaved.getPaths()); - } - - //UnsavedModifications is designed to be used in concurrent - //access mode. For recovery case there is no concurrent access - //involve so just pass a new lock instance - unsaved.persist(nodeStore, new ReentrantLock()); - - log.info("Updated lastRev of [{}] documents while performing lastRev recovery for " + - "cluster node [{}]", size, clusterId); - - return size; - } - - /** - * Determines the last revision value which needs to set for given clusterId - * on the passed document. If the last rev entries are consisted - * - * @param doc NodeDocument where lastRev entries needs to be fixed - * @param clusterId clusterId for which lastRev has to be checked - * @return lastRev which needs to be updated. null if no - * updated is required i.e. lastRev entries are valid - */ - @CheckForNull - private Revision determineMissedLastRev(NodeDocument doc, int clusterId) { - Revision currentLastRev = doc.getLastRev().get(clusterId); - if (currentLastRev == null) { - currentLastRev = new Revision(0, 0, clusterId); - } - - ClusterPredicate cp = new ClusterPredicate(clusterId); - - //Merge sort the revs for which changes have been made - //to this doc - - //localMap always keeps the most recent valid commit entry - //per cluster node so looking into that should be sufficient - Iterable revs = mergeSorted(of( - filter(doc.getLocalCommitRoot().keySet(), cp), - filter(doc.getLocalRevisions().keySet(), cp)), - StableRevisionComparator.REVERSE - ); - - //Look for latest valid revision > currentLastRev - //if found then lastRev needs to be fixed - for (Revision rev : revs) { - if (rev.compareRevisionTime(currentLastRev) > 0) { - if (doc.isCommitted(rev)) { - return rev; - } - } else { - //No valid revision found > currentLastRev - //indicates that lastRev is valid for given clusterId - //and no further checks are required - break; - } - } - return null; - } - - private static class ClusterPredicate implements Predicate { - private final int clusterId; - - private ClusterPredicate(int clusterId) { - this.clusterId = clusterId; - } - - @Override - public boolean apply(Revision input) { - return clusterId == input.getClusterId(); - } - } -} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (working copy) @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.document; + +import static com.google.common.collect.ImmutableList.of; +import static com.google.common.collect.Iterables.filter; +import static com.google.common.collect.Iterables.mergeSorted; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +import javax.annotation.CheckForNull; + +import com.google.common.base.Predicate; +import com.google.common.collect.Maps; + +import org.apache.jackrabbit.oak.commons.PathUtils; +import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore; +import org.apache.jackrabbit.oak.plugins.document.mongo.MongoMissingLastRevSeeker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Utility class for recovering potential missing _lastRev updates of nodes due to crash of a node. + */ +public class LastRevRecoveryAgent { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final DocumentNodeStore nodeStore; + + private final MissingLastRevSeeker missingLastRevUtil; + + LastRevRecoveryAgent(DocumentNodeStore nodeStore) { + this.nodeStore = nodeStore; + + if (nodeStore.getDocumentStore() instanceof MongoDocumentStore) { + this.missingLastRevUtil = + new MongoMissingLastRevSeeker((MongoDocumentStore) nodeStore.getDocumentStore()); + } else { + this.missingLastRevUtil = new MissingLastRevSeeker(nodeStore.getDocumentStore()); + } + } + + /** + * Recover the correct _lastRev updates for potentially missing candidate nodes. + * + * @param clusterId the cluster id for which the _lastRev are to be recovered + * @return the int the number of restored nodes + */ + public int recover(int clusterId) { + ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId); + + if (nodeInfo != null) { + Long leaseEnd = (Long) (nodeInfo.get(ClusterNodeInfo.LEASE_END_KEY)); + + // Check if _lastRev recovery needed for this cluster node + // state == null && recoveryLock not held by someone + if (nodeInfo.get(ClusterNodeInfo.STATE) != null + && nodeInfo.get(ClusterNodeInfo.REV_RECOVERY_LOCK) == null) { + + // retrieve the root document's _lastRev + NodeDocument root = missingLastRevUtil.getRoot(); + Revision lastRev = root.getLastRev().get(clusterId); + + // start time is the _lastRev timestamp of this cluster node + long startTime = lastRev.getTimestamp(); + + // Endtime is the leaseEnd + the asyncDelay + long endTime = leaseEnd + nodeStore.getAsyncDelay(); + + log.info("Recovering candidates modified in time range : {0}", + new Object[] {startTime, endTime}); + + return recoverCandidates(clusterId, startTime, endTime); + } + } + + log.info("No recovery needed for clusterId"); + return 0; + } + + /** + * Recover the correct _lastRev updates for the given candidate nodes. + * + * @param suspects the potential suspects + * @param clusterId the cluster id for which _lastRev recovery needed + * @return the int + */ + public int recover(Iterator suspects, int clusterId) { + UnsavedModifications unsaved = new UnsavedModifications(); + UnsavedModifications unsavedParents = new UnsavedModifications(); + + //Map of known last rev of checked paths + Map knownLastRevs = Maps.newHashMap(); + + while (suspects.hasNext()) { + NodeDocument doc = suspects.next(); + + Revision currentLastRev = doc.getLastRev().get(clusterId); + if (currentLastRev != null) { + knownLastRevs.put(doc.getPath(), currentLastRev); + } + Revision lostLastRev = determineMissedLastRev(doc, clusterId); + + //1. Update lastRev for this doc + if (lostLastRev != null) { + unsaved.put(doc.getPath(), lostLastRev); + } + + Revision lastRevForParents = lostLastRev != null ? lostLastRev : currentLastRev; + + //If both currentLastRev and lostLastRev are null it means + //that no change is done by suspect cluster on this document + //so nothing needs to be updated. Probably it was only changed by + //other cluster nodes. If this node is parent of any child node which + //has been modified by cluster then that node roll up would + //add this node path to unsaved + + //2. Update lastRev for parent paths aka rollup + if (lastRevForParents != null) { + String path = doc.getPath(); + while (true) { + if (PathUtils.denotesRoot(path)) { + break; + } + path = PathUtils.getParentPath(path); + unsavedParents.put(path, lastRevForParents); + } + } + } + + for (String parentPath : unsavedParents.getPaths()) { + Revision calcLastRev = unsavedParents.get(parentPath); + Revision knownLastRev = knownLastRevs.get(parentPath); + + //Copy the calcLastRev of parent only if they have changed + //In many case it might happen that parent have consistent lastRev + //This check ensures that unnecessary updates are not made + if (knownLastRev == null + || calcLastRev.compareRevisionTime(knownLastRev) > 0) { + unsaved.put(parentPath, calcLastRev); + } + } + + //Note the size before persist as persist operation + //would empty the internal state + int size = unsaved.getPaths().size(); + + if (log.isDebugEnabled()) { + log.debug("Last revision for following documents would be updated {}", unsaved + .getPaths()); + } + + //UnsavedModifications is designed to be used in concurrent + //access mode. For recovery case there is no concurrent access + //involve so just pass a new lock instance + unsaved.persist(nodeStore, new ReentrantLock()); + + log.info("Updated lastRev of [{}] documents while performing lastRev recovery for " + + "cluster node [{}]", size, clusterId); + + return size; + } + + /** + * Retrieves possible candidates which have been modifed in the time range and recovers the + * missing updates. + * + * @param clusterId the cluster id + * @param startTime the start time + * @param endTime the end time + * @return the int the number of restored nodes + */ + private int recoverCandidates(final int clusterId, final long startTime, final long endTime) { + // take a lock on the update process by setting the value of the lock to true + updateRecoveryLockOnCluster(clusterId, ClusterNodeInfo.REV_RECOVERY_ON); + + Iterable suspects = + missingLastRevUtil.getCandidates(startTime, endTime); + if (log.isDebugEnabled()) { + log.debug("_lastRev recovery candidates : {}", suspects); + } + + try { + return recover(suspects.iterator(), clusterId); + } finally { + if (suspects instanceof Closeable) { + try { + ((Closeable) suspects).close(); + } catch (IOException e) { + log.error("Error closing iterable : ", e); + } + } + // Relinquish the lock on the recovery for the cluster on the clusterInfo + updateRecoveryLockOnCluster(clusterId, null); + } + } + + /** + * Determines the last revision value which needs to set for given clusterId + * on the passed document. If the last rev entries are consisted + * + * @param doc NodeDocument where lastRev entries needs to be fixed + * @param clusterId clusterId for which lastRev has to be checked + * @return lastRev which needs to be updated. null if no + * updated is required i.e. lastRev entries are valid + */ + @CheckForNull + private Revision determineMissedLastRev(NodeDocument doc, int clusterId) { + Revision currentLastRev = doc.getLastRev().get(clusterId); + if (currentLastRev == null) { + currentLastRev = new Revision(0, 0, clusterId); + } + + ClusterPredicate cp = new ClusterPredicate(clusterId); + + // Merge sort the revs for which changes have been made + // to this doc + + // localMap always keeps the most recent valid commit entry + // per cluster node so looking into that should be sufficient + Iterable revs = mergeSorted(of( + filter(doc.getLocalCommitRoot().keySet(), cp), + filter(doc.getLocalRevisions().keySet(), cp)), + StableRevisionComparator.REVERSE + ); + + // Look for latest valid revision > currentLastRev + // if found then lastRev needs to be fixed + for (Revision rev : revs) { + if (rev.compareRevisionTime(currentLastRev) > 0) { + if (doc.isCommitted(rev)) { + return rev; + } + } else { + // No valid revision found > currentLastRev + // indicates that lastRev is valid for given clusterId + // and no further checks are required + break; + } + } + return null; + } + + /** + * Set/Unset lock value on the clusterInfo for the clusterId + * + * @param clusterId for which _lastRev recovery operation performed + * @param value the lock value + */ + protected void updateRecoveryLockOnCluster(int clusterId, String value) { + UpdateOp update = new UpdateOp("" + clusterId, true); + update.set(ClusterNodeInfo.REV_RECOVERY_LOCK, value); + nodeStore.getDocumentStore().createOrUpdate(Collection.CLUSTER_NODES, update); + } + + private static class ClusterPredicate implements Predicate { + private final int clusterId; + + private ClusterPredicate(int clusterId) { + this.clusterId = clusterId; + } + + @Override + public boolean apply(Revision input) { + return clusterId == input.getClusterId(); + } + } +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (working copy) @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.document; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +import org.apache.jackrabbit.oak.plugins.document.util.Utils; + +/** + * Utils to retrieve _lastRev missing update candidates. + */ +public class MissingLastRevSeeker { + protected final String ROOT_PATH = "/"; + private final DocumentStore store; + + public MissingLastRevSeeker(DocumentStore store) { + this.store = store; + } + + /** + * Gets the cluster node info for the given cluster node id. + * + * @param clusterId the cluster id + * @return the cluster node info + */ + public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) { + // Fetch all documents. + List nodes = store.query(Collection.CLUSTER_NODES, "0", + "a", Integer.MAX_VALUE); + Iterable clusterIterable = + Iterables.filter(nodes, + new Predicate() { + // Return cluster info for the required clusterId + @Override + public boolean apply(ClusterNodeInfoDocument input) { + String id = input.getId(); + return (id.equals(String.valueOf(clusterId))); + } + }); + + if (clusterIterable.iterator().hasNext()) { + return clusterIterable.iterator().next(); + } + + return null; + } + + /** + * Get the candidates with modified time between the time range specified. + * + * @param startTime the start of the time range + * @param endTime the end of the time range + * @return the candidates + */ + public Iterable getCandidates(final long startTime, final long endTime) { + // Fetch all documents. + List nodes = store.query(Collection.NODES, NodeDocument.MIN_ID_VALUE, + NodeDocument.MAX_ID_VALUE, Integer.MAX_VALUE); + return Iterables.filter(nodes, new Predicate() { + @Override + public boolean apply(NodeDocument input) { + Long modified = (Long) input.get(NodeDocument.MODIFIED_IN_SECS); + return (modified != null + && (modified > TimeUnit.MILLISECONDS.toSeconds(startTime)) + && (modified < TimeUnit.MILLISECONDS.toSeconds(endTime))); + } + }); + } + + public NodeDocument getRoot() { + return store.find(Collection.NODES, Utils.getIdFromPath(ROOT_PATH)); + } +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java (revision 0) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java (working copy) @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.document.mongo; + +import static com.google.common.collect.Iterables.transform; + +import com.google.common.base.Function; +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.QueryBuilder; +import com.mongodb.ReadPreference; + +import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument; +import org.apache.jackrabbit.oak.plugins.document.Collection; +import org.apache.jackrabbit.oak.plugins.document.Commit; +import org.apache.jackrabbit.oak.plugins.document.MissingLastRevSeeker; +import org.apache.jackrabbit.oak.plugins.document.NodeDocument; +import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable; + +/** + * Mongo specific version of MissingLastRevSeeker which uses mongo queries + * to fetch candidates which may have missed '_lastRev' updates. + * + * Uses a time range to find documents modified during that interval. + */ +public class MongoMissingLastRevSeeker extends MissingLastRevSeeker { + private final MongoDocumentStore store; + + public MongoMissingLastRevSeeker(MongoDocumentStore store) { + super(store); + this.store = store; + } + + @Override + public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) { + DBObject query = + QueryBuilder + .start(NodeDocument.ID).is(String.valueOf(clusterId)).get(); + DBCursor cursor = + getClusterNodeCollection().find(query) + .setReadPreference( + ReadPreference.secondaryPreferred()); + DBObject obj = null; + if (cursor.hasNext()) { + obj = cursor.next(); + return store.convertFromDBObject(Collection.CLUSTER_NODES, obj); + } + + return null; + } + + @Override + public CloseableIterable getCandidates(final long startTime, + final long endTime) { + DBObject query = + QueryBuilder + .start(NodeDocument.MODIFIED_IN_SECS).lessThanEquals( + Commit.getModifiedInSecs(endTime)) + .put(NodeDocument.MODIFIED_IN_SECS).greaterThanEquals( + Commit.getModifiedInSecs(startTime)) + .get(); + DBObject sortFields = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, -1); + + DBCursor cursor = + getNodeCollection().find(query) + .sort(sortFields) + .setReadPreference( + ReadPreference.secondaryPreferred()); + return CloseableIterable.wrap(transform(cursor, new Function() { + @Override + public NodeDocument apply(DBObject input) { + return store.convertFromDBObject(Collection.NODES, input); + } + }), cursor); + } + + private DBCollection getNodeCollection() { + return store.getDBCollection(Collection.NODES); + } + + private DBCollection getClusterNodeCollection() { + return store.getDBCollection(Collection.CLUSTER_NODES); + } +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (revision 1583596) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (working copy) @@ -183,7 +183,7 @@ *

* Value: the revision. */ - private static final String LAST_REV = "_lastRev"; + public static final String LAST_REV = "_lastRev"; /** * Flag indicating that there are child nodes present. Its just used as a hint. Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java (revision 1583596) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java (working copy) @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.jackrabbit.oak.stats.Clock; /** * A revision. */ @@ -58,6 +59,20 @@ */ private final boolean branch; + /** Only set for testing */ + private static Clock clock; + + /** + * + * Only to be used for testing. + * Do Not Use Otherwise + * + * + * @param c - the clock + */ + protected static void setClock(Clock c) { + clock = c; + } public Revision(long timestamp, int counter, int clusterId) { this(timestamp, counter, clusterId, false); } @@ -150,6 +165,9 @@ */ public static long getCurrentTimestamp() { long timestamp = System.currentTimeMillis(); + if (clock != null) { + timestamp = clock.getTime(); + } if (timestamp < lastTimestamp) { // protect against decreases in the system time, // time machines, and other fluctuations in the time continuum Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java (revision 1583596) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java (working copy) @@ -134,5 +134,14 @@ return false; } } + + @Override + public void dispose() { + try{ + MongoConnection connection = new MongoConnection(uri); + connection.getDB().dropDatabase(); + } catch(Exception e) { + } + } } } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java (revision 1583596) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java (working copy) @@ -90,7 +90,7 @@ //lastRev should not be updated for C #2 assertNull(y1.getLastRev().get(2)); - LastRevRecovery recovery = new LastRevRecovery(ds1); + LastRevRecoveryAgent recovery = new LastRevRecoveryAgent(ds1); //Do not pass y1 but still y1 should be updated recovery.recover(Iterators.forArray(x1,z1), 2); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java (revision 0) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java (working copy) @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.document; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import com.google.common.collect.Lists; + +import org.apache.jackrabbit.oak.stats.Clock; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Tests the restore of potentially missing _lastRev updates. + */ +@RunWith(Parameterized.class) +public class LastRevSingleNodeRecoveryTest { + private DocumentStoreFixture fixture; + + private Clock clock; + + private DocumentMK mk; + + public LastRevSingleNodeRecoveryTest(DocumentStoreFixture fixture) { + this.fixture = fixture; + } + + @Parameterized.Parameters + public static Collection fixtures() throws IOException { + List fixtures = Lists.newArrayList(); + + DocumentStoreFixture mongo = new DocumentStoreFixture.MongoFixture(); + if (mongo.isAvailable()) { + fixtures.add(new Object[] {mongo}); + } + return fixtures; + } + + private DocumentMK createMK(int clusterId) throws InterruptedException { + clock = new Clock.Virtual(); + return openMK(clusterId, fixture.createDocumentStore()); + } + + private DocumentMK openMK(int clusterId, DocumentStore store) throws InterruptedException { + clock.waitUntil(System.currentTimeMillis()); + + // Sets the clock for testing + ClusterNodeInfo.setClock(clock); + Revision.setClock(clock); + + DocumentMK.Builder builder = new DocumentMK.Builder(); + builder.setAsyncDelay(0) + .setClusterId(clusterId) + .clock(clock) + .setDocumentStore(store); + mk = builder.open(); + clock.waitUntil(Revision.getCurrentTimestamp()); + + return mk; + } + + @Before + public void setUp() throws InterruptedException { + try { + mk = createMK(0); + Assume.assumeNotNull(mk); + + // initialize node hierarchy + mk.commit("/", "+\"x\" : { \"y\": {\"z\":{} } }", null, null); + mk.commit("/", "+\"a\" : { \"b\": {\"c\": {}} }", null, null); + } catch (Exception e) { + Assume.assumeNoException(e); + } + } + + @Test + public void testLastRevRestoreOnNodeStart() throws Exception { + // pending updates + setupScenario(); + + // renew lease + clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10); + mk.getClusterInfo().renewLease(0); + + // so that the current time is more than the current lease end + clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000); + // Recreate mk instance, to simulate fail condition and recovery on start + mk = openMK(0, mk.getNodeStore().getDocumentStore()); + + int pendingCount = mk.getPendingWriteCount(); + + // so that the current time is more than the current lease end + clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000); + // Immediately check again, now should not have done any changes. + LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent(); + /** Now there should have been pendingCount updates **/ + assertEquals(pendingCount, recoveryAgent.recover(mk.getClusterInfo().getId())); + } + + @Test + public void testLastRevRestore() throws Exception { + setupScenario(); + + int pendingCount = mk.getPendingWriteCount(); + // so that the current time is more than the current lease end + clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000); + LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent(); + + /** All pending updates should have been restored **/ + assertEquals(pendingCount, recoveryAgent.recover(mk.getClusterInfo().getId())); + } + + + @Test + public void testNoMissingUpdates() throws Exception { + setupScenario(); + mk.backgroundWrite(); + + // move the time forward and do another update of the root node so that only 2 nodes are + // candidates + clock.waitUntil(clock.getTime() + 5000); + mk.commit("/", "^\"a/key2\" : \"value2\"", null, null); + mk.backgroundWrite(); + + clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime()); + mk.getClusterInfo().renewLease(0); + + // Should be 0 + int pendingCount = mk.getPendingWriteCount(); + LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent(); + + clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime()); + /** There should have been no updates **/ + assertEquals(pendingCount, recoveryAgent.recover(mk.getClusterInfo().getId())); + } + + private void setupScenario() throws InterruptedException { + // add some nodes which won't be returned + mk.commit("/", "+\"u\" : { \"v\": {}}", null, null); + mk.commit("/u", "^\"v/key1\" : \"value1\"", null, null); + + // move the time forward so that the root gets updated + clock.waitUntil(clock.getTime() + 5000); + mk.commit("/", "^\"a/key1\" : \"value1\"", null, null); + mk.backgroundWrite(); + + // move the time forward to have a new node under root + clock.waitUntil(clock.getTime() + 5000); + mk.commit("/", "+\"p\":{}", null, null); + + // move the time forward to write all pending changes + clock.waitUntil(clock.getTime() + 5000); + mk.backgroundWrite(); + + // renew lease one last time + clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime()); + mk.getClusterInfo().renewLease(0); + + clock.waitUntil(clock.getTime() + 5000); + // add nodes won't trigger _lastRev updates + addNodes(); + } + + /** + * Should have the + */ + private void addNodes() { + // change node /a/b/c by adding a property + mk.commit("/a/b", "^\"c/key1\" : \"value1\"", null, null); + // add node /a/b/c/d + mk.commit("/a/b/c", "+\"d\":{}", null, null); + // add node /a/b/f + mk.commit("/a/b", "+\"f\" : {}", null, null); + // add node /a/b/f/e + mk.commit("/a/b/f", "+\"e\": {}", null, null); + // change node /x/y/z + mk.commit("/x/y", "^\"z/key1\" : \"value1\"", null, null); + } + + @After + public void tearDown() throws Exception { + Revision.setClock(null); + ClusterNodeInfo.setClock(null); + mk.dispose(); + fixture.dispose(); + } +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property