Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (revision 1729384)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (working copy)
@@ -443,6 +443,7 @@
int clusterNodeId = 0;
int maxId = 0;
ClusterNodeState state = ClusterNodeState.NONE;
+ RecoverLockState lockState = RecoverLockState.NONE;
Long prevLeaseEnd = null;
boolean newEntry = false;
@@ -519,6 +520,7 @@
clusterNodeId = id;
state = ClusterNodeState.fromString((String) doc.get(STATE));
prevLeaseEnd = leaseEnd;
+ lockState = RecoverLockState.fromString((String) doc.get(REV_RECOVERY_LOCK));
}
}
@@ -540,7 +542,7 @@
// Do not expire entries and stick on the earlier state, and leaseEnd so,
// that _lastRev recovery if needed is done.
return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state,
- RecoverLockState.NONE, prevLeaseEnd, newEntry);
+ lockState, prevLeaseEnd, newEntry);
}
private static boolean waitForLeaseExpiry(DocumentStore store, ClusterNodeInfoDocument cdoc, long leaseEnd, String machineId,
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1729384)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy)
@@ -26,6 +26,7 @@
import static com.google.common.collect.Lists.reverse;
import static java.util.Collections.singletonList;
import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES;
import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.FAST_DIFF;
@@ -147,6 +148,13 @@
Boolean.parseBoolean(System.getProperty("oak.fairBackgroundOperationLock", "true"));
/**
+ * The timeout in milliseconds to wait for the recovery performed by
+ * another cluster node.
+ */
+ private long recoveryWaitTimeoutMS =
+ Long.getLong("oak.recoveryWaitTimeoutMS", 60000);
+
+ /**
* The document store (might be used by multiple node stores).
*/
protected final DocumentStore store;
@@ -519,9 +527,25 @@
/**
* Recover _lastRev recovery if needed.
+ *
+ * @throws DocumentStoreException if recovery did not finish within
+ * {@link #recoveryWaitTimeoutMS}.
*/
- private void checkLastRevRecovery() {
- lastRevRecoveryAgent.recover(clusterId);
+ private void checkLastRevRecovery() throws DocumentStoreException {
+ long timeout = clock.getTime() + recoveryWaitTimeoutMS;
+ if (lastRevRecoveryAgent.recover(clusterId, timeout) == -1) {
+ ClusterNodeInfoDocument doc = store.find(CLUSTER_NODES, String.valueOf(clusterId));
+ String otherId = "n/a";
+ if (doc != null) {
+ otherId = String.valueOf(doc.get(ClusterNodeInfo.REV_RECOVERY_BY));
+ }
+ String msg = "This cluster node (" + clusterId + ") requires " +
+ "_lastRev recovery which is currently performed by " +
+ "another cluster node (" + otherId + "). Recovery is " +
+ "still ongoing after " + recoveryWaitTimeoutMS + " ms. " +
+ "Failing startup of this DocumentNodeStore now!";
+ throw new DocumentStoreException(msg);
+ }
}
public void dispose() {
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (revision 1729384)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (working copy)
@@ -40,6 +40,7 @@
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.plugins.document.util.MapFactory;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.stats.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,12 +66,32 @@
}
/**
- * Recover the correct _lastRev updates for potentially missing candidate nodes.
+ * Recover the correct _lastRev updates for potentially missing candidate
+ * nodes. If another cluster node is already performing the recovery for the
+ * given {@code clusterId}, this method will {@code waitUntil} the given
+ * time in milliseconds for the recovery to finish.
*
+ * This method will return:
+ *
+ * - {@code -1} when another cluster node is busy performing recovery
+ * for the given {@code clusterId} and the {@code waitUntil} time is
+ * reached.
+ * - {@code 0} when no recovery was needed or this thread waited
+ * for another cluster node to finish the recovery within the given
+ * {@code waitUntil} time.
+ * - A positive value for the number of recovered documents when
+ * recovery was performed by this thread / cluster node.
+ *
+ *
* @param clusterId the cluster id for which the _lastRev are to be recovered
- * @return the number of restored nodes
+ * @param waitUntil wait until this time is milliseconds for recovery of the
+ * given {@code clusterId} if another cluster node is
+ * already performing the recovery.
+ * @return the number of restored nodes or {@code -1} if a timeout occurred
+ * while waiting for an ongoing recovery by another cluster node.
*/
- public int recover(int clusterId) {
+ public int recover(int clusterId, long waitUntil)
+ throws DocumentStoreException {
ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId);
//TODO Currently leaseTime remains same per cluster node. If this
@@ -80,7 +101,7 @@
if (nodeInfo != null) {
// Check if _lastRev recovery needed for this cluster node
- // state is Active && recoveryLock not held by someone
+ // state is Active && current time past leaseEnd
if (isRecoveryNeeded(nodeInfo)) {
long leaseEnd = nodeInfo.getLeaseEndTime();
@@ -98,10 +119,7 @@
startTime = leaseEnd - leaseTime - asyncDelay;
}
- log.info("Recovering candidates modified after: [{}] for clusterId [{}]",
- Utils.timestampToString(startTime), clusterId);
-
- return recoverCandidates(clusterId, startTime);
+ return recoverCandidates(clusterId, startTime, waitUntil);
}
}
@@ -110,6 +128,18 @@
}
/**
+ * Same as {@link #recover(int, long)}, but does not wait for ongoing
+ * recovery.
+ *
+ * @param clusterId the cluster id for which the _lastRev are to be recovered
+ * @return the number of restored nodes or {@code -1} if there is an ongoing
+ * recovery by another cluster node.
+ */
+ public int recover(int clusterId) {
+ return recover(clusterId, 0);
+ }
+
+ /**
* Recover the correct _lastRev updates for the given candidate nodes.
*
* @param suspects the potential suspects
@@ -274,21 +304,52 @@
*
* @param clusterId the cluster id
* @param startTime the start time
- * @return the number of restored nodes
+ * @param waitUntil wait at most until this time for an ongoing recovery
+ * done by another cluster node.
+ * @return the number of restored nodes or {@code -1} if recovery is still
+ * ongoing even when {@code waitUntil} time was reached.
*/
- private int recoverCandidates(final int clusterId, final long startTime) {
+ private int recoverCandidates(final int clusterId,
+ final long startTime,
+ final long waitUntil) {
int myClusterId = nodeStore.getClusterId();
boolean lockAcquired = missingLastRevUtil.acquireRecoveryLock(clusterId, myClusterId);
- //TODO What if recovery is being performed for current clusterNode by some other node
- //should we halt the startup
- if(!lockAcquired){
- log.info("Last revision recovery already being performed by some other node. " +
- "Would not attempt recovery");
- return 0;
+ if (!lockAcquired) {
+ Clock clock = nodeStore.getClock();
+ if (clock.getTime() > waitUntil) {
+ // no need to wait for lock release, waitUntil already reached
+ return -1;
+ }
+ log.info("Last revision recovery already being performed by some " +
+ "other cluster node. Waiting at most until {} for " +
+ "recovery to finish.",
+ Utils.timestampToString(waitUntil));
+ while (clock.getTime() < waitUntil) {
+ // check once a second
+ long time = Math.min(waitUntil, clock.getTime() + 1000);
+ try {
+ clock.waitUntil(time);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ String msg = "Interrupted while waiting for _lastRev recovery to finish.";
+ throw new DocumentStoreException(msg, e);
+ }
+ ClusterNodeInfoDocument infoDoc = missingLastRevUtil.getClusterNodeInfo(clusterId);
+ if (!infoDoc.isBeingRecovered()) {
+ return 0;
+ }
+ log.info("Last revision recovery for clusterId {} still " +
+ "ongoing by cluster node {}", clusterId,
+ infoDoc.get(ClusterNodeInfo.REV_RECOVERY_BY));
+ }
+ return -1;
}
+ log.info("Recovering candidates modified after: [{}] for clusterId [{}]",
+ Utils.timestampToString(startTime), clusterId);
+
Iterable suspects = missingLastRevUtil.getCandidates(startTime);
log.info("Performing Last Revision Recovery for clusterNodeId {}", clusterId);
@@ -377,7 +438,7 @@
String id = nodeInfo.getId();
if (nodeInfo.isBeingRecovered()) {
Long recoveredBy = (Long) nodeInfo.get(ClusterNodeInfo.REV_RECOVERY_BY);
- beingRecoveredRightNow.add(nodeInfo == null ? id : String.format("%s (by %d)", id, recoveredBy));
+ beingRecoveredRightNow.add(recoveredBy == null ? id : String.format("%s (by %d)", id, recoveredBy));
} else if (isRecoveryNeeded(nodeInfo)) {
candidateClusterNodes.add(Integer.valueOf(id));
}
@@ -391,11 +452,11 @@
}
/**
- * Check if _lastRev recovery needed for this cluster node state is Active
- * && currentTime past the leaseEnd time && recoveryLock not held by someone
+ * Check if _lastRev recovery needed for this cluster node
+ * state is Active && currentTime past the leaseEnd time
*/
private boolean isRecoveryNeeded(@Nonnull ClusterNodeInfoDocument nodeInfo) {
- return nodeInfo.isActive() && nodeStore.getClock().getTime() > nodeInfo.getLeaseEndTime() && !nodeInfo.isBeingRecovered();
+ return nodeInfo.isActive() && nodeStore.getClock().getTime() > nodeInfo.getLeaseEndTime();
}
private static class ClusterPredicate implements Predicate {
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java (revision 1729384)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java (working copy)
@@ -42,6 +42,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class LastRevRecoveryTest {
@Rule
@@ -59,6 +60,7 @@
clock = new Clock.Virtual();
clock.waitUntil(System.currentTimeMillis());
Revision.setClock(clock);
+ ClusterNodeInfo.setClock(clock);
// disable lease check because we fiddle with the virtual clock
final boolean leaseCheck = false;
sharedStore = new MemoryDocumentStore();
@@ -85,6 +87,7 @@
public void tearDown() {
ds1.dispose();
ds2.dispose();
+ ClusterNodeInfo.resetClockToDefault();
Revision.resetClockToDefault();
}
@@ -164,7 +167,74 @@
0, agent.recover(c1Id));
}
+ // OAK-3488
+ @Test
+ public void recoveryWithTimeout() throws Exception {
+ String clusterId = String.valueOf(c1Id);
+ ClusterNodeInfoDocument doc = sharedStore.find(CLUSTER_NODES, clusterId);
+ NodeBuilder builder = ds1.getRoot().builder();
+ builder.child("x").child("y").child("z");
+ merge(ds1, builder);
+ ds1.dispose();
+
+ // reset clusterNodes entry to simulate a crash
+ sharedStore.remove(CLUSTER_NODES, clusterId);
+ sharedStore.create(CLUSTER_NODES, newArrayList(updateOpFromDocument(doc)));
+
+ // 'wait' until lease expires
+ clock.waitUntil(doc.getLeaseEndTime() + 1);
+
+ // simulate ongoing recovery by cluster node 2
+ MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore);
+ seeker.acquireRecoveryLock(c1Id, c2Id);
+
+ // run recovery from ds1
+ LastRevRecoveryAgent a1 = new LastRevRecoveryAgent(ds1);
+ // use current time -> do not wait for recovery of other agent
+ assertEquals(-1, a1.recover(c1Id, clock.getTime()));
+
+ seeker.releaseRecoveryLock(c1Id);
+
+ assertEquals(0, a1.recover(c1Id, clock.getTime() + 1000));
+ }
+
+ // OAK-3488
+ @Test
+ public void failStartupOnRecoveryTimeout() throws Exception {
+ String clusterId = String.valueOf(c1Id);
+ ClusterNodeInfoDocument doc = sharedStore.find(CLUSTER_NODES, clusterId);
+
+ NodeBuilder builder = ds1.getRoot().builder();
+ builder.child("x").child("y").child("z");
+ merge(ds1, builder);
+ ds1.dispose();
+
+ // reset clusterNodes entry to simulate a crash
+ sharedStore.remove(CLUSTER_NODES, clusterId);
+ sharedStore.create(CLUSTER_NODES, newArrayList(updateOpFromDocument(doc)));
+
+ // 'wait' until lease expires
+ clock.waitUntil(doc.getLeaseEndTime() + 1);
+
+ // simulate ongoing recovery by cluster node 2
+ MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore);
+ assertTrue(seeker.acquireRecoveryLock(c1Id, c2Id));
+
+ // attempt to restart ds1 while lock is acquired
+ try {
+ ds1 = new DocumentMK.Builder()
+ .clock(clock)
+ .setDocumentStore(sharedStore)
+ .setClusterId(c1Id)
+ .getNodeStore();
+ fail("DocumentStoreException expected");
+ } catch (DocumentStoreException e) {
+ // expected
+ }
+ seeker.releaseRecoveryLock(c1Id);
+ }
+
private NodeDocument getDocument(DocumentNodeStore nodeStore, String path) {
return nodeStore.getDocumentStore().find(Collection.NODES, Utils.getIdFromPath(path));
}