Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (revision 1583596)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (working copy)
@@ -16,6 +16,8 @@
*/
package org.apache.jackrabbit.oak.plugins.document;
+import static org.apache.jackrabbit.oak.plugins.document.Document.ID;
+
import java.lang.management.ManagementFactory;
import java.net.NetworkInterface;
import java.util.ArrayList;
@@ -26,11 +28,10 @@
import org.apache.jackrabbit.mk.api.MicroKernelException;
import org.apache.jackrabbit.oak.commons.StringUtils;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.jackrabbit.oak.plugins.document.Document.ID;
-
/**
* Information about a cluster node.
*/
@@ -57,9 +58,30 @@
/**
* The end of the lease.
*/
- private static final String LEASE_END_KEY = "leaseEnd";
+ protected static final String LEASE_END_KEY = "leaseEnd";
/**
+ * The state of the cluster.
+ * On proper shutdown the state should be cleared.
+ */
+ protected static final String STATE = "state";
+
+ /**
+ * Flag to indicate whether the _lastRev recovery is in progress.
+ */
+ protected static final String REV_RECOVERY_LOCK = "revLock";
+
+ /**
+ * Active State.
+ */
+ private static final String ACTIVE_STATE = "active";
+
+ /**
+ * _lastRev recovery in progress
+ */
+ protected static final String REV_RECOVERY_ON = "true";
+
+ /**
* Additional info, such as the process id, for support.
*/
private static final String INFO_KEY = "info";
@@ -85,6 +107,11 @@
private static final String WORKING_DIR = System.getProperty("user.dir", "");
/**
+ * Only Used For Testing
+ */
+ private static Clock clock;
+
+ /**
* The number of milliseconds for a lease (1 minute by default, and
* initially).
*/
@@ -130,9 +157,19 @@
*/
private String readWriteMode;
+ /**
+ * The state of the cluter node.
+ */
+ private String state;
+
+ /**
+ * The revLock value of the cluster;
+ */
+ private String revRecoveryLock;
+
ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId) {
this.id = id;
- this.startTime = System.currentTimeMillis();
+ this.startTime = (clock == null ? System.currentTimeMillis() : clock.getTime());
this.leaseEndTime = startTime;
this.store = store;
this.machineId = machineId;
@@ -139,13 +176,34 @@
this.instanceId = instanceId;
}
+ ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId, String state,
+ String revRecoveryLock) {
+ this.id = id;
+ this.startTime = (clock == null ? System.currentTimeMillis() : clock.getTime());
+ this.leaseEndTime = startTime;
+ this.store = store;
+ this.machineId = machineId;
+ this.instanceId = instanceId;
+ this.state = state;
+ this.revRecoveryLock = revRecoveryLock;
+ }
+
public int getId() {
return id;
}
/**
+ * Only Used For Testing
+ *
+ * @return
+ */
+ protected static void setClock(Clock c) {
+ clock = c;
+ }
+
+ /**
* Create a cluster node info instance for the store, with the
- *
+ *
* @param store the document store (for the lease)
* @return the cluster node info
*/
@@ -155,13 +213,14 @@
/**
* Create a cluster node info instance for the store.
- *
+ *
* @param store the document store (for the lease)
* @param machineId the machine id (null for MAC address)
* @param instanceId the instance id (null for current working directory)
* @return the cluster node info
*/
- public static ClusterNodeInfo getInstance(DocumentStore store, String machineId, String instanceId) {
+ public static ClusterNodeInfo getInstance(DocumentStore store, String machineId,
+ String instanceId) {
if (machineId == null) {
machineId = MACHINE_ID;
}
@@ -174,9 +233,14 @@
update.set(ID, String.valueOf(clusterNode.id));
update.set(MACHINE_ID_KEY, clusterNode.machineId);
update.set(INSTANCE_ID_KEY, clusterNode.instanceId);
- update.set(LEASE_END_KEY, System.currentTimeMillis() + clusterNode.leaseTime);
+ update.set(LEASE_END_KEY,
+ (clock == null ? System.currentTimeMillis() : clock.getTime())
+ + clusterNode.leaseTime);
update.set(INFO_KEY, clusterNode.toString());
- boolean success = store.create(Collection.CLUSTER_NODES, Collections.singletonList(update));
+ update.set(STATE, clusterNode.state);
+ update.set(REV_RECOVERY_LOCK, clusterNode.revRecoveryLock);
+ boolean success =
+ store.create(Collection.CLUSTER_NODES, Collections.singletonList(update));
if (success) {
return clusterNode;
}
@@ -184,13 +248,15 @@
throw new MicroKernelException("Could not get cluster node info");
}
- private static ClusterNodeInfo createInstance(DocumentStore store, String machineId, String instanceId) {
- long now = System.currentTimeMillis();
+ private static ClusterNodeInfo createInstance(DocumentStore store, String machineId,
+ String instanceId) {
+ long now = (clock == null ? System.currentTimeMillis() : clock.getTime());
// keys between "0" and "a" includes all possible numbers
List list = store.query(Collection.CLUSTER_NODES,
"0", "a", Integer.MAX_VALUE);
int clusterNodeId = 0;
int maxId = 0;
+ String state = null;
for (Document doc : list) {
String key = doc.getId();
int id;
@@ -222,22 +288,23 @@
if (clusterNodeId == 0 || id < clusterNodeId) {
// if there are multiple, use the smallest value
clusterNodeId = id;
+ state = (String) doc.get(STATE);
}
}
if (clusterNodeId == 0) {
clusterNodeId = maxId + 1;
}
- return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId);
+ return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state, null);
}
/**
* Renew the cluster id lease. This method needs to be called once in a while,
* to ensure the same cluster id is not re-used by a different instance.
- *
+ *
* @param nextCheckMillis the millisecond offset
*/
public void renewLease(long nextCheckMillis) {
- long now = System.currentTimeMillis();
+ long now = (clock == null ? System.currentTimeMillis() : clock.getTime());
if (now + nextCheckMillis + nextCheckMillis < leaseEndTime) {
return;
}
@@ -244,6 +311,7 @@
UpdateOp update = new UpdateOp("" + id, true);
leaseEndTime = now + leaseTime;
update.set(LEASE_END_KEY, leaseEndTime);
+ update.set(STATE, ACTIVE_STATE);
ClusterNodeInfoDocument doc = store.createOrUpdate(Collection.CLUSTER_NODES, update);
String mode = (String) doc.get(READ_WRITE_MODE_KEY);
if (mode != null && !mode.equals(readWriteMode)) {
@@ -263,6 +331,8 @@
public void dispose() {
UpdateOp update = new UpdateOp("" + id, true);
update.set(LEASE_END_KEY, null);
+ update.set(STATE, null);
+ update.set(REV_RECOVERY_LOCK, null);
store.createOrUpdate(Collection.CLUSTER_NODES, update);
}
@@ -273,8 +343,10 @@
"machineId: " + machineId + ",\n" +
"instanceId: " + instanceId + ",\n" +
"pid: " + PROCESS_ID + ",\n" +
- "uuid: " + uuid +",\n" +
- "readWriteMode: " + readWriteMode;
+ "uuid: " + uuid + ",\n" +
+ "readWriteMode: " + readWriteMode + ",\n" +
+ "state: " + state + ",\n" +
+ "revLock: " + revRecoveryLock;
}
private static long getProcessId() {
@@ -290,7 +362,7 @@
/**
* Calculate the unique machine id. This is the lowest MAC address if
* available. As an alternative, a randomly generated UUID is used.
- *
+ *
* @return the unique id
*/
private static String getMachineId() {
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1583596)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy)
@@ -290,6 +290,7 @@
private final VersionGarbageCollector versionGarbageCollector;
private final Executor executor;
+ private final LastRevRecoveryAgent lastRevRecoveryAgent;
public DocumentNodeStore(DocumentMK.Builder builder) {
this.blobStore = builder.getBlobStore();
@@ -322,6 +323,7 @@
this.branches = new UnmergedBranches(getRevisionComparator());
this.asyncDelay = builder.getAsyncDelay();
this.versionGarbageCollector = new VersionGarbageCollector(this);
+ this.lastRevRecoveryAgent = new LastRevRecoveryAgent(this);
this.missing = new DocumentNodeState(this, "MISSING", new Revision(0, 0, 0)) {
@Override
public int getMemory() {
@@ -378,11 +380,19 @@
new BackgroundOperation(this, isDisposed),
"DocumentNodeStore background thread");
backgroundThread.setDaemon(true);
+ checkLastRevRecovery();
backgroundThread.start();
LOG.info("Initialized DocumentNodeStore with clusterNodeId: {}", clusterId);
}
+ /**
+ * Recover _lastRev recovery if needed.
+ */
+ private void checkLastRevRecovery() {
+ lastRevRecoveryAgent.recover(clusterId);
+ }
+
public void dispose() {
runBackgroundOperations();
if (!isDisposed.getAndSet(true)) {
@@ -1707,4 +1717,8 @@
public VersionGarbageCollector getVersionGarbageCollector() {
return versionGarbageCollector;
}
+ @Nonnull
+ public LastRevRecoveryAgent getLastRevRecoveryAgent() {
+ return lastRevRecoveryAgent;
+ }
}
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java (revision 1583596)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java (working copy)
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.jackrabbit.oak.plugins.document;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.annotation.CheckForNull;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Maps;
-import org.apache.jackrabbit.oak.commons.PathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static com.google.common.collect.ImmutableList.of;
-import static com.google.common.collect.Iterables.filter;
-import static com.google.common.collect.Iterables.mergeSorted;
-
-public class LastRevRecovery {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final DocumentNodeStore nodeStore;
-
- public LastRevRecovery(DocumentNodeStore nodeStore) {
- this.nodeStore = nodeStore;
- }
-
- public int recover(Iterator suspects, int clusterId) {
- UnsavedModifications unsaved = new UnsavedModifications();
- UnsavedModifications unsavedParents = new UnsavedModifications();
-
- //Map of known last rev of checked paths
- Map knownLastRevs = Maps.newHashMap();
-
- while (suspects.hasNext()) {
- NodeDocument doc = suspects.next();
-
- Revision currentLastRev = doc.getLastRev().get(clusterId);
- if (currentLastRev != null) {
- knownLastRevs.put(doc.getPath(), currentLastRev);
- }
- Revision lostLastRev = determineMissedLastRev(doc, clusterId);
-
- //1. Update lastRev for this doc
- if (lostLastRev != null) {
- unsaved.put(doc.getPath(), lostLastRev);
- }
-
- Revision lastRevForParents = lostLastRev != null ? lostLastRev : currentLastRev;
-
- //If both currentLastRev and lostLastRev are null it means
- //that no change is done by suspect cluster on this document
- //so nothing needs to be updated. Probably it was only changed by
- //other cluster nodes. If this node is parent of any child node which
- //has been modified by cluster then that node roll up would
- //add this node path to unsaved
-
- //2. Update lastRev for parent paths aka rollup
- if (lastRevForParents != null) {
- String path = doc.getPath();
- while (true) {
- if (PathUtils.denotesRoot(path)) {
- break;
- }
- path = PathUtils.getParentPath(path);
- unsavedParents.put(path, lastRevForParents);
- }
- }
- }
-
- for(String parentPath : unsavedParents.getPaths()){
- Revision calcLastRev = unsavedParents.get(parentPath);
- Revision knownLastRev = knownLastRevs.get(parentPath);
-
- //Copy the calcLastRev of parent only if they have changed
- //In many case it might happen that parent have consistent lastRev
- //This check ensures that unnecessary updates are not made
- if(knownLastRev == null
- || calcLastRev.compareRevisionTime(knownLastRev) > 0){
- unsaved.put(parentPath, calcLastRev);
- }
- }
-
- //Note the size before persist as persist operation
- //would empty the internal state
- int size = unsaved.getPaths().size();
-
- if (log.isDebugEnabled()) {
- log.debug("Last revision for following documents would be updated {}", unsaved.getPaths());
- }
-
- //UnsavedModifications is designed to be used in concurrent
- //access mode. For recovery case there is no concurrent access
- //involve so just pass a new lock instance
- unsaved.persist(nodeStore, new ReentrantLock());
-
- log.info("Updated lastRev of [{}] documents while performing lastRev recovery for " +
- "cluster node [{}]", size, clusterId);
-
- return size;
- }
-
- /**
- * Determines the last revision value which needs to set for given clusterId
- * on the passed document. If the last rev entries are consisted
- *
- * @param doc NodeDocument where lastRev entries needs to be fixed
- * @param clusterId clusterId for which lastRev has to be checked
- * @return lastRev which needs to be updated. null if no
- * updated is required i.e. lastRev entries are valid
- */
- @CheckForNull
- private Revision determineMissedLastRev(NodeDocument doc, int clusterId) {
- Revision currentLastRev = doc.getLastRev().get(clusterId);
- if (currentLastRev == null) {
- currentLastRev = new Revision(0, 0, clusterId);
- }
-
- ClusterPredicate cp = new ClusterPredicate(clusterId);
-
- //Merge sort the revs for which changes have been made
- //to this doc
-
- //localMap always keeps the most recent valid commit entry
- //per cluster node so looking into that should be sufficient
- Iterable revs = mergeSorted(of(
- filter(doc.getLocalCommitRoot().keySet(), cp),
- filter(doc.getLocalRevisions().keySet(), cp)),
- StableRevisionComparator.REVERSE
- );
-
- //Look for latest valid revision > currentLastRev
- //if found then lastRev needs to be fixed
- for (Revision rev : revs) {
- if (rev.compareRevisionTime(currentLastRev) > 0) {
- if (doc.isCommitted(rev)) {
- return rev;
- }
- } else {
- //No valid revision found > currentLastRev
- //indicates that lastRev is valid for given clusterId
- //and no further checks are required
- break;
- }
- }
- return null;
- }
-
- private static class ClusterPredicate implements Predicate {
- private final int clusterId;
-
- private ClusterPredicate(int clusterId) {
- this.clusterId = clusterId;
- }
-
- @Override
- public boolean apply(Revision input) {
- return clusterId == input.getClusterId();
- }
- }
-}
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (working copy)
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static com.google.common.collect.ImmutableList.of;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.mergeSorted;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.annotation.CheckForNull;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
+
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoMissingLastRevSeeker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for recovering potential missing _lastRev updates of nodes due to crash of a node.
+ */
+public class LastRevRecoveryAgent {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final DocumentNodeStore nodeStore;
+
+ private final MissingLastRevSeeker missingLastRevUtil;
+
+ LastRevRecoveryAgent(DocumentNodeStore nodeStore) {
+ this.nodeStore = nodeStore;
+
+ if (nodeStore.getDocumentStore() instanceof MongoDocumentStore) {
+ this.missingLastRevUtil =
+ new MongoMissingLastRevSeeker((MongoDocumentStore) nodeStore.getDocumentStore());
+ } else {
+ this.missingLastRevUtil = new MissingLastRevSeeker(nodeStore.getDocumentStore());
+ }
+ }
+
+ /**
+ * Recover the correct _lastRev updates for potentially missing candidate nodes.
+ *
+ * @param clusterId the cluster id for which the _lastRev are to be recovered
+ * @return the int the number of restored nodes
+ */
+ public int recover(int clusterId) {
+ ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId);
+
+ if (nodeInfo != null) {
+ Long leaseEnd = (Long) (nodeInfo.get(ClusterNodeInfo.LEASE_END_KEY));
+
+ // Check if _lastRev recovery needed for this cluster node
+ // state == null && recoveryLock not held by someone
+ if (nodeInfo.get(ClusterNodeInfo.STATE) != null
+ && nodeInfo.get(ClusterNodeInfo.REV_RECOVERY_LOCK) == null) {
+
+ // retrieve the root document's _lastRev
+ NodeDocument root = missingLastRevUtil.getRoot();
+ Revision lastRev = root.getLastRev().get(clusterId);
+
+ // start time is the _lastRev timestamp of this cluster node
+ long startTime = lastRev.getTimestamp();
+
+ // Endtime is the leaseEnd + the asyncDelay
+ long endTime = leaseEnd + nodeStore.getAsyncDelay();
+
+ log.info("Recovering candidates modified in time range : {0}",
+ new Object[] {startTime, endTime});
+
+ return recoverCandidates(clusterId, startTime, endTime);
+ }
+ }
+
+ log.info("No recovery needed for clusterId");
+ return 0;
+ }
+
+ /**
+ * Recover the correct _lastRev updates for the given candidate nodes.
+ *
+ * @param suspects the potential suspects
+ * @param clusterId the cluster id for which _lastRev recovery needed
+ * @return the int
+ */
+ public int recover(Iterator suspects, int clusterId) {
+ UnsavedModifications unsaved = new UnsavedModifications();
+ UnsavedModifications unsavedParents = new UnsavedModifications();
+
+ //Map of known last rev of checked paths
+ Map knownLastRevs = Maps.newHashMap();
+
+ while (suspects.hasNext()) {
+ NodeDocument doc = suspects.next();
+
+ Revision currentLastRev = doc.getLastRev().get(clusterId);
+ if (currentLastRev != null) {
+ knownLastRevs.put(doc.getPath(), currentLastRev);
+ }
+ Revision lostLastRev = determineMissedLastRev(doc, clusterId);
+
+ //1. Update lastRev for this doc
+ if (lostLastRev != null) {
+ unsaved.put(doc.getPath(), lostLastRev);
+ }
+
+ Revision lastRevForParents = lostLastRev != null ? lostLastRev : currentLastRev;
+
+ //If both currentLastRev and lostLastRev are null it means
+ //that no change is done by suspect cluster on this document
+ //so nothing needs to be updated. Probably it was only changed by
+ //other cluster nodes. If this node is parent of any child node which
+ //has been modified by cluster then that node roll up would
+ //add this node path to unsaved
+
+ //2. Update lastRev for parent paths aka rollup
+ if (lastRevForParents != null) {
+ String path = doc.getPath();
+ while (true) {
+ if (PathUtils.denotesRoot(path)) {
+ break;
+ }
+ path = PathUtils.getParentPath(path);
+ unsavedParents.put(path, lastRevForParents);
+ }
+ }
+ }
+
+ for (String parentPath : unsavedParents.getPaths()) {
+ Revision calcLastRev = unsavedParents.get(parentPath);
+ Revision knownLastRev = knownLastRevs.get(parentPath);
+
+ //Copy the calcLastRev of parent only if they have changed
+ //In many case it might happen that parent have consistent lastRev
+ //This check ensures that unnecessary updates are not made
+ if (knownLastRev == null
+ || calcLastRev.compareRevisionTime(knownLastRev) > 0) {
+ unsaved.put(parentPath, calcLastRev);
+ }
+ }
+
+ //Note the size before persist as persist operation
+ //would empty the internal state
+ int size = unsaved.getPaths().size();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Last revision for following documents would be updated {}", unsaved
+ .getPaths());
+ }
+
+ //UnsavedModifications is designed to be used in concurrent
+ //access mode. For recovery case there is no concurrent access
+ //involve so just pass a new lock instance
+ unsaved.persist(nodeStore, new ReentrantLock());
+
+ log.info("Updated lastRev of [{}] documents while performing lastRev recovery for " +
+ "cluster node [{}]", size, clusterId);
+
+ return size;
+ }
+
+ /**
+ * Retrieves possible candidates which have been modifed in the time range and recovers the
+ * missing updates.
+ *
+ * @param clusterId the cluster id
+ * @param startTime the start time
+ * @param endTime the end time
+ * @return the int the number of restored nodes
+ */
+ private int recoverCandidates(final int clusterId, final long startTime, final long endTime) {
+ // take a lock on the update process by setting the value of the lock to true
+ updateRecoveryLockOnCluster(clusterId, ClusterNodeInfo.REV_RECOVERY_ON);
+
+ Iterable suspects =
+ missingLastRevUtil.getCandidates(startTime, endTime);
+ if (log.isDebugEnabled()) {
+ log.debug("_lastRev recovery candidates : {}", suspects);
+ }
+
+ try {
+ return recover(suspects.iterator(), clusterId);
+ } finally {
+ if (suspects instanceof Closeable) {
+ try {
+ ((Closeable) suspects).close();
+ } catch (IOException e) {
+ log.error("Error closing iterable : ", e);
+ }
+ }
+ // Relinquish the lock on the recovery for the cluster on the clusterInfo
+ updateRecoveryLockOnCluster(clusterId, null);
+ }
+ }
+
+ /**
+ * Determines the last revision value which needs to set for given clusterId
+ * on the passed document. If the last rev entries are consisted
+ *
+ * @param doc NodeDocument where lastRev entries needs to be fixed
+ * @param clusterId clusterId for which lastRev has to be checked
+ * @return lastRev which needs to be updated. null if no
+ * updated is required i.e. lastRev entries are valid
+ */
+ @CheckForNull
+ private Revision determineMissedLastRev(NodeDocument doc, int clusterId) {
+ Revision currentLastRev = doc.getLastRev().get(clusterId);
+ if (currentLastRev == null) {
+ currentLastRev = new Revision(0, 0, clusterId);
+ }
+
+ ClusterPredicate cp = new ClusterPredicate(clusterId);
+
+ // Merge sort the revs for which changes have been made
+ // to this doc
+
+ // localMap always keeps the most recent valid commit entry
+ // per cluster node so looking into that should be sufficient
+ Iterable revs = mergeSorted(of(
+ filter(doc.getLocalCommitRoot().keySet(), cp),
+ filter(doc.getLocalRevisions().keySet(), cp)),
+ StableRevisionComparator.REVERSE
+ );
+
+ // Look for latest valid revision > currentLastRev
+ // if found then lastRev needs to be fixed
+ for (Revision rev : revs) {
+ if (rev.compareRevisionTime(currentLastRev) > 0) {
+ if (doc.isCommitted(rev)) {
+ return rev;
+ }
+ } else {
+ // No valid revision found > currentLastRev
+ // indicates that lastRev is valid for given clusterId
+ // and no further checks are required
+ break;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Set/Unset lock value on the clusterInfo for the clusterId
+ *
+ * @param clusterId for which _lastRev recovery operation performed
+ * @param value the lock value
+ */
+ protected void updateRecoveryLockOnCluster(int clusterId, String value) {
+ UpdateOp update = new UpdateOp("" + clusterId, true);
+ update.set(ClusterNodeInfo.REV_RECOVERY_LOCK, value);
+ nodeStore.getDocumentStore().createOrUpdate(Collection.CLUSTER_NODES, update);
+ }
+
+ private static class ClusterPredicate implements Predicate {
+ private final int clusterId;
+
+ private ClusterPredicate(int clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ @Override
+ public boolean apply(Revision input) {
+ return clusterId == input.getClusterId();
+ }
+ }
+}
Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (working copy)
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+
+/**
+ * Utils to retrieve _lastRev missing update candidates.
+ */
+public class MissingLastRevSeeker {
+ protected final String ROOT_PATH = "/";
+ private final DocumentStore store;
+
+ public MissingLastRevSeeker(DocumentStore store) {
+ this.store = store;
+ }
+
+ /**
+ * Gets the cluster node info for the given cluster node id.
+ *
+ * @param clusterId the cluster id
+ * @return the cluster node info
+ */
+ public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) {
+ // Fetch all documents.
+ List nodes = store.query(Collection.CLUSTER_NODES, "0",
+ "a", Integer.MAX_VALUE);
+ Iterable clusterIterable =
+ Iterables.filter(nodes,
+ new Predicate() {
+ // Return cluster info for the required clusterId
+ @Override
+ public boolean apply(ClusterNodeInfoDocument input) {
+ String id = input.getId();
+ return (id.equals(String.valueOf(clusterId)));
+ }
+ });
+
+ if (clusterIterable.iterator().hasNext()) {
+ return clusterIterable.iterator().next();
+ }
+
+ return null;
+ }
+
+ /**
+ * Get the candidates with modified time between the time range specified.
+ *
+ * @param startTime the start of the time range
+ * @param endTime the end of the time range
+ * @return the candidates
+ */
+ public Iterable getCandidates(final long startTime, final long endTime) {
+ // Fetch all documents.
+ List nodes = store.query(Collection.NODES, NodeDocument.MIN_ID_VALUE,
+ NodeDocument.MAX_ID_VALUE, Integer.MAX_VALUE);
+ return Iterables.filter(nodes, new Predicate() {
+ @Override
+ public boolean apply(NodeDocument input) {
+ Long modified = (Long) input.get(NodeDocument.MODIFIED_IN_SECS);
+ return (modified != null
+ && (modified > TimeUnit.MILLISECONDS.toSeconds(startTime))
+ && (modified < TimeUnit.MILLISECONDS.toSeconds(endTime)));
+ }
+ });
+ }
+
+ public NodeDocument getRoot() {
+ return store.find(Collection.NODES, Utils.getIdFromPath(ROOT_PATH));
+ }
+}
Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java (revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java (working copy)
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import static com.google.common.collect.Iterables.transform;
+
+import com.google.common.base.Function;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.QueryBuilder;
+import com.mongodb.ReadPreference;
+
+import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.apache.jackrabbit.oak.plugins.document.Commit;
+import org.apache.jackrabbit.oak.plugins.document.MissingLastRevSeeker;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
+
+/**
+ * Mongo specific version of MissingLastRevSeeker which uses mongo queries
+ * to fetch candidates which may have missed '_lastRev' updates.
+ *
+ * Uses a time range to find documents modified during that interval.
+ */
+public class MongoMissingLastRevSeeker extends MissingLastRevSeeker {
+ private final MongoDocumentStore store;
+
+ public MongoMissingLastRevSeeker(MongoDocumentStore store) {
+ super(store);
+ this.store = store;
+ }
+
+ @Override
+ public ClusterNodeInfoDocument getClusterNodeInfo(final int clusterId) {
+ DBObject query =
+ QueryBuilder
+ .start(NodeDocument.ID).is(String.valueOf(clusterId)).get();
+ DBCursor cursor =
+ getClusterNodeCollection().find(query)
+ .setReadPreference(
+ ReadPreference.secondaryPreferred());
+ DBObject obj = null;
+ if (cursor.hasNext()) {
+ obj = cursor.next();
+ return store.convertFromDBObject(Collection.CLUSTER_NODES, obj);
+ }
+
+ return null;
+ }
+
+ @Override
+ public CloseableIterable getCandidates(final long startTime,
+ final long endTime) {
+ DBObject query =
+ QueryBuilder
+ .start(NodeDocument.MODIFIED_IN_SECS).lessThanEquals(
+ Commit.getModifiedInSecs(endTime))
+ .put(NodeDocument.MODIFIED_IN_SECS).greaterThanEquals(
+ Commit.getModifiedInSecs(startTime))
+ .get();
+ DBObject sortFields = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, -1);
+
+ DBCursor cursor =
+ getNodeCollection().find(query)
+ .sort(sortFields)
+ .setReadPreference(
+ ReadPreference.secondaryPreferred());
+ return CloseableIterable.wrap(transform(cursor, new Function() {
+ @Override
+ public NodeDocument apply(DBObject input) {
+ return store.convertFromDBObject(Collection.NODES, input);
+ }
+ }), cursor);
+ }
+
+ private DBCollection getNodeCollection() {
+ return store.getDBCollection(Collection.NODES);
+ }
+
+ private DBCollection getClusterNodeCollection() {
+ return store.getDBCollection(Collection.CLUSTER_NODES);
+ }
+}
Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoMissingLastRevSeeker.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (revision 1583596)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/NodeDocument.java (working copy)
@@ -183,7 +183,7 @@
*
* Value: the revision.
*/
- private static final String LAST_REV = "_lastRev";
+ public static final String LAST_REV = "_lastRev";
/**
* Flag indicating that there are child nodes present. Its just used as a hint.
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java (revision 1583596)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/Revision.java (working copy)
@@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.jackrabbit.oak.stats.Clock;
/**
* A revision.
*/
@@ -58,6 +59,20 @@
*/
private final boolean branch;
+ /** Only set for testing */
+ private static Clock clock;
+
+ /**
+ *
+ * Only to be used for testing.
+ * Do Not Use Otherwise
+ *
+ *
+ * @param c - the clock
+ */
+ protected static void setClock(Clock c) {
+ clock = c;
+ }
public Revision(long timestamp, int counter, int clusterId) {
this(timestamp, counter, clusterId, false);
}
@@ -150,6 +165,9 @@
*/
public static long getCurrentTimestamp() {
long timestamp = System.currentTimeMillis();
+ if (clock != null) {
+ timestamp = clock.getTime();
+ }
if (timestamp < lastTimestamp) {
// protect against decreases in the system time,
// time machines, and other fluctuations in the time continuum
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java (revision 1583596)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentStoreFixture.java (working copy)
@@ -134,5 +134,14 @@
return false;
}
}
+
+ @Override
+ public void dispose() {
+ try{
+ MongoConnection connection = new MongoConnection(uri);
+ connection.getDB().dropDatabase();
+ } catch(Exception e) {
+ }
+ }
}
}
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java (revision 1583596)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java (working copy)
@@ -90,7 +90,7 @@
//lastRev should not be updated for C #2
assertNull(y1.getLastRev().get(2));
- LastRevRecovery recovery = new LastRevRecovery(ds1);
+ LastRevRecoveryAgent recovery = new LastRevRecoveryAgent(ds1);
//Do not pass y1 but still y1 should be updated
recovery.recover(Iterators.forArray(x1,z1), 2);
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java (revision 0)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java (working copy)
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Tests the restore of potentially missing _lastRev updates.
+ */
+@RunWith(Parameterized.class)
+public class LastRevSingleNodeRecoveryTest {
+ private DocumentStoreFixture fixture;
+
+ private Clock clock;
+
+ private DocumentMK mk;
+
+ public LastRevSingleNodeRecoveryTest(DocumentStoreFixture fixture) {
+ this.fixture = fixture;
+ }
+
+ @Parameterized.Parameters
+ public static Collection