Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (revision 1729384) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (working copy) @@ -443,6 +443,7 @@ int clusterNodeId = 0; int maxId = 0; ClusterNodeState state = ClusterNodeState.NONE; + RecoverLockState lockState = RecoverLockState.NONE; Long prevLeaseEnd = null; boolean newEntry = false; @@ -519,6 +520,7 @@ clusterNodeId = id; state = ClusterNodeState.fromString((String) doc.get(STATE)); prevLeaseEnd = leaseEnd; + lockState = RecoverLockState.fromString((String) doc.get(REV_RECOVERY_LOCK)); } } @@ -540,7 +542,7 @@ // Do not expire entries and stick on the earlier state, and leaseEnd so, // that _lastRev recovery if needed is done. return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state, - RecoverLockState.NONE, prevLeaseEnd, newEntry); + lockState, prevLeaseEnd, newEntry); } private static boolean waitForLeaseExpiry(DocumentStore store, ClusterNodeInfoDocument cdoc, long leaseEnd, String machineId, Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1729384) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -26,6 +26,7 @@ import static com.google.common.collect.Lists.reverse; import static java.util.Collections.singletonList; import static org.apache.jackrabbit.oak.commons.PathUtils.concat; +import static org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES; import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; import static org.apache.jackrabbit.oak.plugins.document.DocumentMK.FAST_DIFF; @@ -147,6 +148,13 @@ Boolean.parseBoolean(System.getProperty("oak.fairBackgroundOperationLock", "true")); /** + * The timeout in milliseconds to wait for the recovery performed by + * another cluster node. + */ + private long recoveryWaitTimeoutMS = + Long.getLong("oak.recoveryWaitTimeoutMS", 60000); + + /** * The document store (might be used by multiple node stores). */ protected final DocumentStore store; @@ -519,9 +527,25 @@ /** * Recover _lastRev recovery if needed. + * + * @throws DocumentStoreException if recovery did not finish within + * {@link #recoveryWaitTimeoutMS}. */ - private void checkLastRevRecovery() { - lastRevRecoveryAgent.recover(clusterId); + private void checkLastRevRecovery() throws DocumentStoreException { + long timeout = clock.getTime() + recoveryWaitTimeoutMS; + if (lastRevRecoveryAgent.recover(clusterId, timeout) == -1) { + ClusterNodeInfoDocument doc = store.find(CLUSTER_NODES, String.valueOf(clusterId)); + String otherId = "n/a"; + if (doc != null) { + otherId = String.valueOf(doc.get(ClusterNodeInfo.REV_RECOVERY_BY)); + } + String msg = "This cluster node (" + clusterId + ") requires " + + "_lastRev recovery which is currently performed by " + + "another cluster node (" + otherId + "). Recovery is " + + "still ongoing after " + recoveryWaitTimeoutMS + " ms. " + + "Failing startup of this DocumentNodeStore now!"; + throw new DocumentStoreException(msg); + } } public void dispose() { Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (revision 1729384) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (working copy) @@ -40,6 +40,7 @@ import org.apache.jackrabbit.oak.commons.PathUtils; import org.apache.jackrabbit.oak.plugins.document.util.MapFactory; import org.apache.jackrabbit.oak.plugins.document.util.Utils; +import org.apache.jackrabbit.oak.stats.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,12 +66,32 @@ } /** - * Recover the correct _lastRev updates for potentially missing candidate nodes. + * Recover the correct _lastRev updates for potentially missing candidate + * nodes. If another cluster node is already performing the recovery for the + * given {@code clusterId}, this method will {@code waitUntil} the given + * time in milliseconds for the recovery to finish. * + * This method will return: + * + * * @param clusterId the cluster id for which the _lastRev are to be recovered - * @return the number of restored nodes + * @param waitUntil wait until this time is milliseconds for recovery of the + * given {@code clusterId} if another cluster node is + * already performing the recovery. + * @return the number of restored nodes or {@code -1} if a timeout occurred + * while waiting for an ongoing recovery by another cluster node. */ - public int recover(int clusterId) { + public int recover(int clusterId, long waitUntil) + throws DocumentStoreException { ClusterNodeInfoDocument nodeInfo = missingLastRevUtil.getClusterNodeInfo(clusterId); //TODO Currently leaseTime remains same per cluster node. If this @@ -80,7 +101,7 @@ if (nodeInfo != null) { // Check if _lastRev recovery needed for this cluster node - // state is Active && recoveryLock not held by someone + // state is Active && current time past leaseEnd if (isRecoveryNeeded(nodeInfo)) { long leaseEnd = nodeInfo.getLeaseEndTime(); @@ -98,10 +119,7 @@ startTime = leaseEnd - leaseTime - asyncDelay; } - log.info("Recovering candidates modified after: [{}] for clusterId [{}]", - Utils.timestampToString(startTime), clusterId); - - return recoverCandidates(clusterId, startTime); + return recoverCandidates(clusterId, startTime, waitUntil); } } @@ -110,6 +128,18 @@ } /** + * Same as {@link #recover(int, long)}, but does not wait for ongoing + * recovery. + * + * @param clusterId the cluster id for which the _lastRev are to be recovered + * @return the number of restored nodes or {@code -1} if there is an ongoing + * recovery by another cluster node. + */ + public int recover(int clusterId) { + return recover(clusterId, 0); + } + + /** * Recover the correct _lastRev updates for the given candidate nodes. * * @param suspects the potential suspects @@ -274,21 +304,52 @@ * * @param clusterId the cluster id * @param startTime the start time - * @return the number of restored nodes + * @param waitUntil wait at most until this time for an ongoing recovery + * done by another cluster node. + * @return the number of restored nodes or {@code -1} if recovery is still + * ongoing even when {@code waitUntil} time was reached. */ - private int recoverCandidates(final int clusterId, final long startTime) { + private int recoverCandidates(final int clusterId, + final long startTime, + final long waitUntil) { int myClusterId = nodeStore.getClusterId(); boolean lockAcquired = missingLastRevUtil.acquireRecoveryLock(clusterId, myClusterId); - //TODO What if recovery is being performed for current clusterNode by some other node - //should we halt the startup - if(!lockAcquired){ - log.info("Last revision recovery already being performed by some other node. " + - "Would not attempt recovery"); - return 0; + if (!lockAcquired) { + Clock clock = nodeStore.getClock(); + if (clock.getTime() > waitUntil) { + // no need to wait for lock release, waitUntil already reached + return -1; + } + log.info("Last revision recovery already being performed by some " + + "other cluster node. Waiting at most until {} for " + + "recovery to finish.", + Utils.timestampToString(waitUntil)); + while (clock.getTime() < waitUntil) { + // check once a second + long time = Math.min(waitUntil, clock.getTime() + 1000); + try { + clock.waitUntil(time); + } catch (InterruptedException e) { + Thread.interrupted(); + String msg = "Interrupted while waiting for _lastRev recovery to finish."; + throw new DocumentStoreException(msg, e); + } + ClusterNodeInfoDocument infoDoc = missingLastRevUtil.getClusterNodeInfo(clusterId); + if (!infoDoc.isBeingRecovered()) { + return 0; + } + log.info("Last revision recovery for clusterId {} still " + + "ongoing by cluster node {}", clusterId, + infoDoc.get(ClusterNodeInfo.REV_RECOVERY_BY)); + } + return -1; } + log.info("Recovering candidates modified after: [{}] for clusterId [{}]", + Utils.timestampToString(startTime), clusterId); + Iterable suspects = missingLastRevUtil.getCandidates(startTime); log.info("Performing Last Revision Recovery for clusterNodeId {}", clusterId); @@ -377,7 +438,7 @@ String id = nodeInfo.getId(); if (nodeInfo.isBeingRecovered()) { Long recoveredBy = (Long) nodeInfo.get(ClusterNodeInfo.REV_RECOVERY_BY); - beingRecoveredRightNow.add(nodeInfo == null ? id : String.format("%s (by %d)", id, recoveredBy)); + beingRecoveredRightNow.add(recoveredBy == null ? id : String.format("%s (by %d)", id, recoveredBy)); } else if (isRecoveryNeeded(nodeInfo)) { candidateClusterNodes.add(Integer.valueOf(id)); } @@ -391,11 +452,11 @@ } /** - * Check if _lastRev recovery needed for this cluster node state is Active - * && currentTime past the leaseEnd time && recoveryLock not held by someone + * Check if _lastRev recovery needed for this cluster node + * state is Active && currentTime past the leaseEnd time */ private boolean isRecoveryNeeded(@Nonnull ClusterNodeInfoDocument nodeInfo) { - return nodeInfo.isActive() && nodeStore.getClock().getTime() > nodeInfo.getLeaseEndTime() && !nodeInfo.isBeingRecovered(); + return nodeInfo.isActive() && nodeStore.getClock().getTime() > nodeInfo.getLeaseEndTime(); } private static class ClusterPredicate implements Predicate { Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java (revision 1729384) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryTest.java (working copy) @@ -42,6 +42,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class LastRevRecoveryTest { @Rule @@ -59,6 +60,7 @@ clock = new Clock.Virtual(); clock.waitUntil(System.currentTimeMillis()); Revision.setClock(clock); + ClusterNodeInfo.setClock(clock); // disable lease check because we fiddle with the virtual clock final boolean leaseCheck = false; sharedStore = new MemoryDocumentStore(); @@ -85,6 +87,7 @@ public void tearDown() { ds1.dispose(); ds2.dispose(); + ClusterNodeInfo.resetClockToDefault(); Revision.resetClockToDefault(); } @@ -164,7 +167,74 @@ 0, agent.recover(c1Id)); } + // OAK-3488 + @Test + public void recoveryWithTimeout() throws Exception { + String clusterId = String.valueOf(c1Id); + ClusterNodeInfoDocument doc = sharedStore.find(CLUSTER_NODES, clusterId); + NodeBuilder builder = ds1.getRoot().builder(); + builder.child("x").child("y").child("z"); + merge(ds1, builder); + ds1.dispose(); + + // reset clusterNodes entry to simulate a crash + sharedStore.remove(CLUSTER_NODES, clusterId); + sharedStore.create(CLUSTER_NODES, newArrayList(updateOpFromDocument(doc))); + + // 'wait' until lease expires + clock.waitUntil(doc.getLeaseEndTime() + 1); + + // simulate ongoing recovery by cluster node 2 + MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore); + seeker.acquireRecoveryLock(c1Id, c2Id); + + // run recovery from ds1 + LastRevRecoveryAgent a1 = new LastRevRecoveryAgent(ds1); + // use current time -> do not wait for recovery of other agent + assertEquals(-1, a1.recover(c1Id, clock.getTime())); + + seeker.releaseRecoveryLock(c1Id); + + assertEquals(0, a1.recover(c1Id, clock.getTime() + 1000)); + } + + // OAK-3488 + @Test + public void failStartupOnRecoveryTimeout() throws Exception { + String clusterId = String.valueOf(c1Id); + ClusterNodeInfoDocument doc = sharedStore.find(CLUSTER_NODES, clusterId); + + NodeBuilder builder = ds1.getRoot().builder(); + builder.child("x").child("y").child("z"); + merge(ds1, builder); + ds1.dispose(); + + // reset clusterNodes entry to simulate a crash + sharedStore.remove(CLUSTER_NODES, clusterId); + sharedStore.create(CLUSTER_NODES, newArrayList(updateOpFromDocument(doc))); + + // 'wait' until lease expires + clock.waitUntil(doc.getLeaseEndTime() + 1); + + // simulate ongoing recovery by cluster node 2 + MissingLastRevSeeker seeker = new MissingLastRevSeeker(sharedStore); + assertTrue(seeker.acquireRecoveryLock(c1Id, c2Id)); + + // attempt to restart ds1 while lock is acquired + try { + ds1 = new DocumentMK.Builder() + .clock(clock) + .setDocumentStore(sharedStore) + .setClusterId(c1Id) + .getNodeStore(); + fail("DocumentStoreException expected"); + } catch (DocumentStoreException e) { + // expected + } + seeker.releaseRecoveryLock(c1Id); + } + private NodeDocument getDocument(DocumentNodeStore nodeStore, String path) { return nodeStore.getDocumentStore().find(Collection.NODES, Utils.getIdFromPath(path)); }