Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (revision 1583921) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java (working copy) @@ -192,10 +192,14 @@ private RecoverLockState revRecoveryLock; ClusterNodeInfo(int id, DocumentStore store, String machineId, String instanceId, ClusterNodeState state, - RecoverLockState revRecoveryLock) { + RecoverLockState revRecoveryLock, Long leaseEnd) { this.id = id; this.startTime = getCurrentTime(); - this.leaseEndTime = startTime; + if (leaseEnd == null) { + this.leaseEndTime = startTime; + } else { + this.leaseEndTime = leaseEnd; + } this.store = store; this.machineId = machineId; this.instanceId = instanceId; @@ -214,7 +218,7 @@ * @return the cluster node info */ public static ClusterNodeInfo getInstance(DocumentStore store) { - return getInstance(store, MACHINE_ID, WORKING_DIR); + return getInstance(store, MACHINE_ID, WORKING_DIR, false); } /** @@ -227,6 +231,20 @@ */ public static ClusterNodeInfo getInstance(DocumentStore store, String machineId, String instanceId) { + return getInstance(store, machineId, instanceId, true); + } + + /** + * Create a cluster node info instance for the store. + * + * @param store the document store (for the lease) + * @param machineId the machine id (null for MAC address) + * @param instanceId the instance id (null for current working directory) + * @param updateLease whether to update the lease + * @return the cluster node info + */ + public static ClusterNodeInfo getInstance(DocumentStore store, String machineId, + String instanceId, boolean updateLease) { if (machineId == null) { machineId = MACHINE_ID; } @@ -239,15 +257,18 @@ update.set(ID, String.valueOf(clusterNode.id)); update.set(MACHINE_ID_KEY, clusterNode.machineId); update.set(INSTANCE_ID_KEY, clusterNode.instanceId); - update.set(LEASE_END_KEY, getCurrentTime() + clusterNode.leaseTime); + if (updateLease) { + update.set(LEASE_END_KEY, getCurrentTime() + clusterNode.leaseTime); + } else { + update.set(LEASE_END_KEY, clusterNode.leaseEndTime); + } update.set(INFO_KEY, clusterNode.toString()); update.set(STATE, clusterNode.state.name()); update.set(REV_RECOVERY_LOCK, clusterNode.revRecoveryLock.name()); - boolean success = - store.create(Collection.CLUSTER_NODES, Collections.singletonList(update)); - if (success) { - return clusterNode; - } + + // No expiration of earlier cluster info, so update + store.createOrUpdate(Collection.CLUSTER_NODES, update); + return clusterNode; } throw new MicroKernelException("Could not get cluster node info"); } @@ -262,6 +283,8 @@ int clusterNodeId = 0; int maxId = 0; ClusterNodeState state = ClusterNodeState.NONE; + Long prevLeaseEnd = null; + for (Document doc : list) { String key = doc.getId(); int id; @@ -288,18 +311,22 @@ // a different machine or instance continue; } - // remove expired matching entries - store.remove(Collection.CLUSTER_NODES, key); + if (clusterNodeId == 0 || id < clusterNodeId) { // if there are multiple, use the smallest value clusterNodeId = id; state = ClusterNodeState.fromString((String) doc.get(STATE)); + prevLeaseEnd = leaseEnd; } } if (clusterNodeId == 0) { clusterNodeId = maxId + 1; } - return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state, RecoverLockState.NONE); + + // Do not expire entries and stick on the earlier state, and leaseEnd so, + // that _lastRev recovery if needed is done. + return new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, state, + RecoverLockState.NONE, prevLeaseEnd); } /** Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (revision 1583921) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (working copy) @@ -382,6 +382,9 @@ "DocumentNodeStore background thread"); backgroundThread.setDaemon(true); checkLastRevRecovery(); + // Renew the lease becuase it may have been stale + backgroundRenewClusterIdLease(); + backgroundThread.start(); LOG.info("Initialized DocumentNodeStore with clusterNodeId: {}", clusterId); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (revision 1583921) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java (working copy) @@ -24,6 +24,7 @@ import static com.google.common.collect.Iterables.mergeSorted; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; @@ -30,6 +31,7 @@ import javax.annotation.CheckForNull; import com.google.common.base.Predicate; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.jackrabbit.oak.commons.PathUtils; @@ -74,10 +76,8 @@ long leaseEnd = nodeInfo.getLeaseEndTime(); // Check if _lastRev recovery needed for this cluster node - // state == null && recoveryLock not held by someone - if (nodeInfo.isActive() - && !nodeInfo.isBeingRecovered()) { - + // state is Active && recoveryLock not held by someone + if (isRecoveryNeeded(nodeInfo)) { // retrieve the root document's _lastRev NodeDocument root = missingLastRevUtil.getRoot(); Revision lastRev = root.getLastRev().get(clusterId); @@ -260,7 +260,39 @@ } return null; } + + /** + * Gets the _lastRev recovery candidate cluster nodes. + * + * @return the recovery candidate nodes + */ + public Iterable getRecoveryCandidateNodes() { + Iterable clusters = missingLastRevUtil.getClusters(); + List candidateClusterNodes = Lists.newArrayList(); + + for (ClusterNodeInfoDocument nodeInfo : clusters) { + if (isRecoveryNeeded(nodeInfo)) { + candidateClusterNodes.add(nodeInfo.getId()); + } + } + + return candidateClusterNodes; + } + + private boolean isRecoveryNeeded(ClusterNodeInfoDocument nodeInfo) { + if (nodeInfo != null) { + // Check if _lastRev recovery needed for this cluster node + // state is Active && currentTime past the leaseEnd time && recoveryLock not held by someone + if (nodeInfo.isActive() + && nodeStore.getClock().getTime() > nodeInfo.getLeaseEndTime() + && !nodeInfo.isBeingRecovered()) { + return true; + } + } + return false; + } + private static class ClusterPredicate implements Predicate { private final int clusterId; Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (revision 1583921) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/MissingLastRevSeeker.java (working copy) @@ -39,8 +39,18 @@ public MissingLastRevSeeker(DocumentStore store) { this.store = store; } - + /** + * Gets the clusters which potentially need _lastRev recovery. + * + * @return the clusters + */ + public Iterable getClusters() { + return store.query(Collection.CLUSTER_NODES, "0", + "a", Integer.MAX_VALUE); + } + + /** * Gets the cluster node info for the given cluster node id. * * @param clusterId the cluster id Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java (revision 1583921) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterInfoTest.java (working copy) @@ -24,6 +24,7 @@ import java.util.List; import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore; +import org.apache.jackrabbit.oak.stats.Clock; import org.junit.Test; import com.mongodb.ReadPreference; @@ -38,6 +39,10 @@ public void readWriteMode() throws InterruptedException { MemoryDocumentStore mem = new MemoryDocumentStore(); + Clock clock = new Clock.Virtual(); + clock.waitUntil(System.currentTimeMillis()); + ClusterNodeInfo.setClock(clock); + DocumentNodeStore ns1 = new DocumentMK.Builder(). setDocumentStore(mem). setAsyncDelay(0). @@ -46,8 +51,13 @@ setDocumentStore(mem). setAsyncDelay(0). getNodeStore(); + // Bring the current time forward to after the leaseTime which would have been + // updated in the DocumentNodeStore initialization. + clock.waitUntil(clock.getTime() + ns1.getClusterInfo().getLeaseTime()); + ns1.getClusterInfo().setLeaseTime(0); ns2.getClusterInfo().setLeaseTime(0); + List list = mem.query( Collection.CLUSTER_NODES, "0", "a", Integer.MAX_VALUE); assertEquals(2, list.size()); @@ -75,6 +85,7 @@ assertEquals(ReadPreference.nearest(), mem.getReadPreference()); assertEquals(WriteConcern.FSYNCED, mem.getWriteConcern()); + ClusterNodeInfo.setClock(null); ns1.dispose(); ns2.dispose(); } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java (revision 1583921) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/LastRevSingleNodeRecoveryTest.java (working copy) @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Collection; +import java.util.Iterator; import java.util.List; import com.google.common.collect.Lists; @@ -100,6 +101,8 @@ @Test public void testLastRevRestoreOnNodeStart() throws Exception { + clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10); + // pending updates setupScenario(); @@ -113,9 +116,6 @@ mk = openMK(0, mk.getNodeStore().getDocumentStore()); int pendingCount = mk.getPendingWriteCount(); - - // so that the current time is more than the current lease end - clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000); // Immediately check again, now should not have done any changes. LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent(); /** Now there should have been pendingCount updates **/ @@ -124,6 +124,7 @@ @Test public void testLastRevRestore() throws Exception { + clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10); setupScenario(); int pendingCount = mk.getPendingWriteCount(); @@ -138,6 +139,7 @@ @Test public void testNoMissingUpdates() throws Exception { + clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10); setupScenario(); mk.backgroundWrite(); @@ -158,7 +160,21 @@ /** There should have been no updates **/ assertEquals(pendingCount, recoveryAgent.recover(mk.getClusterInfo().getId())); } + + @Test + public void testNodeRecoveryNeeded() throws InterruptedException { + clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 10); + setupScenario(); + // so that the current time is more than the current lease end + clock.waitUntil(clock.getTime() + mk.getClusterInfo().getLeaseTime() + 1000); + + LastRevRecoveryAgent recoveryAgent = mk.getNodeStore().getLastRevRecoveryAgent(); + Iterator iter = recoveryAgent.getRecoveryCandidateNodes().iterator(); + assertEquals(String.valueOf(1), iter.next()); + assertEquals(false, iter.hasNext()); + } + private void setupScenario() throws InterruptedException { // add some nodes which won't be returned mk.commit("/", "+\"u\" : { \"v\": {}}", null, null); @@ -210,4 +226,3 @@ fixture.dispose(); } } -