Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java	(revision 1693866)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java	(working copy)
@@ -177,7 +177,7 @@
     /**
      * The time (in milliseconds UTC) where the lease of this instance ends.
      */
-    private long leaseEndTime;
+    private volatile long leaseEndTime;
 
     /**
      * The read/write mode.
@@ -188,8 +188,20 @@
      * The state of the cluter node.
      */
     private ClusterNodeState state;
+    
+    /**
+     * Whether or not the OAK-2739/leaseCheck failed and thus a System.exit was already triggered
+     * (is used to avoid calling System.exit a hundred times when it then happens)
+     */
+    private volatile boolean systemExitTriggered;
 
     /**
+     * Tracks the fact whether the lease has *ever* been renewed by this instance
+     * or has just be read from the document store at initialization time.
+     */
+    private boolean renewed;
+    
+    /**
      * The revLock value of the cluster;
      */
     private RecoverLockState revRecoveryLock;
@@ -212,6 +224,7 @@
         } else {
             this.leaseEndTime = leaseEnd;
         }
+        this.renewed = false; // will be updated once we renew it the first time
         this.store = store;
         this.machineId = machineId;
         this.instanceId = instanceId;
@@ -356,6 +369,49 @@
                 RecoverLockState.NONE, prevLeaseEnd, newEntry);
     }
 
+    public void performLeaseCheck() {
+        if (!renewed) {
+            // the 'renewed' flag indicates if this instance *ever* renewed the lease after startup
+            // until that is not set, we cannot do the lease check (otherwise startup wouldn't work)
+            return;
+        }
+        final long now = getCurrentTime();
+        if (now < leaseEndTime) {
+            // then all is good
+            return;
+        }
+        
+        // OAK-2739 : when the lease is not current, we must stop
+        // the instance immediately to avoid any cluster inconsistency
+        final String errorMsg = "performLeaseCheck: this instance failed to update the lease in time "
+                + "(leaseEndTime: "+leaseEndTime+", now: "+now+", leaseTime: "+leaseTime+") "
+                + "and is thus no longer eligible for taking part in the cluster";
+        LOG.error(errorMsg);
+        
+        // now here comes the thing: we should a) call System.exit in a separate thread
+        // to avoid any deadlock when calling from eg within the shutdown hook
+        // AND b) we should not call system.exit hundred times.
+        // so for b) we use 'systemExitTriggered' to avoid calling it over and over
+        // BUT it doesn't have to be 100% ensured that system.exit is called only once.
+        // it is fine if it gets called once, twice - but just not hundred times.
+        // which is a long way of saying: volatile is fine here - and the 'if' too
+        if (!systemExitTriggered) {
+            systemExitTriggered = true;
+            final Runnable r = new Runnable() {
+
+                @Override
+                public void run() {
+                    System.exit(-1);
+                }
+                
+            };
+            final Thread th = new Thread(r);
+            th.setDaemon(true);
+            th.start();
+        }
+        throw new AssertionError(errorMsg);
+    }
+
     /**
      * Renew the cluster id lease. This method needs to be called once in a while,
      * to ensure the same cluster id is not re-used by a different instance.
@@ -379,6 +435,7 @@
             readWriteMode = mode;
             store.setReadWriteMode(mode);
         }
+        renewed = true;
         return true;
     }
 
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java	(revision 1693866)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java	(working copy)
@@ -474,6 +474,7 @@
         private int asyncDelay = 1000;
         private boolean timing;
         private boolean logging;
+        private boolean leaseCheck;
         private Weigher<CacheValue, CacheValue> weigher = new EmpiricalWeigher();
         private long memoryCacheSize = DEFAULT_MEMORY_CACHE_SIZE;
         private int nodeCachePercentage = DEFAULT_NODE_CACHE_PERCENTAGE;
@@ -604,6 +605,15 @@
         public boolean getLogging() {
             return logging;
         }
+        
+        public Builder setLeaseCheck(boolean leaseCheck) {
+            this.leaseCheck = leaseCheck;
+            return this;
+        }
+        
+        public boolean getLeaseCheck() {
+            return leaseCheck;
+        }
 
         /**
          * Set the document store to use. By default an in-memory store is used.
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java	(revision 1693866)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java	(working copy)
@@ -372,7 +372,8 @@
                         diffCachePercentage).
                 setCacheSegmentCount(cacheSegmentCount).
                 setCacheStackMoveDistance(cacheStackMoveDistance).
-                offHeapCacheSize(offHeapCache * MB);
+                offHeapCacheSize(offHeapCache * MB).
+                setLeaseCheck(true /* OAK-2739: enabled by default */);
 
         if (persistentCache != null && persistentCache.length() > 0) {
             mkBuilder.setPersistentCache(persistentCache);
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java	(revision 0)
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.util;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.cache.CacheStats;
+import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
+import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
+import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
+
+/**
+ * Wrapper of another DocumentStore that does a lease check on any method
+ * invocation (read or update) and fails if the lease is not valid.
+ * <p>
+ * @see https://issues.apache.org/jira/browse/OAK-2739 for more details
+ */
+public final class LeaseCheckDocumentStoreWrapper implements DocumentStore {
+    
+    private final DocumentStore delegate;
+    private final ClusterNodeInfo clusterNodeInfo;
+
+    public LeaseCheckDocumentStoreWrapper(final DocumentStore delegate, 
+            final ClusterNodeInfo clusterNodeInfo) {
+        if (delegate==null) {
+            throw new IllegalArgumentException("delegate must not be null");
+        }
+        if (clusterNodeInfo==null) {
+            throw new IllegalArgumentException("clusterNodeInfo must not be null");
+        }
+        this.delegate = delegate;
+        this.clusterNodeInfo = clusterNodeInfo;
+    }
+    
+    private final void performLeaseCheck() {
+        clusterNodeInfo.performLeaseCheck();
+    }
+    
+    @Override
+    public final <T extends Document> T find(Collection<T> collection, String key) {
+        performLeaseCheck();
+        return delegate.find(collection, key);
+    }
+
+    @Override
+    public final <T extends Document> T find(Collection<T> collection, String key,
+            int maxCacheAge) {
+        performLeaseCheck();
+        return delegate.find(collection, key, maxCacheAge);
+    }
+
+    @Override
+    public final <T extends Document> List<T> query(Collection<T> collection,
+            String fromKey, String toKey, int limit) {
+        performLeaseCheck();
+        return delegate.query(collection, fromKey, toKey, limit);
+    }
+
+    @Override
+    public final <T extends Document> List<T> query(Collection<T> collection,
+            String fromKey, String toKey, String indexedProperty,
+            long startValue, int limit) {
+        performLeaseCheck();
+        return delegate.query(collection, fromKey, toKey, indexedProperty, startValue, limit);
+    }
+
+    @Override
+    public final <T extends Document> void remove(Collection<T> collection, String key) {
+        performLeaseCheck();
+        delegate.remove(collection, key);
+    }
+
+    @Override
+    public final <T extends Document> void remove(Collection<T> collection,
+            List<String> keys) {
+        performLeaseCheck();
+        delegate.remove(collection, keys);
+    }
+
+    @Override
+    public final <T extends Document> int remove(Collection<T> collection,
+            Map<String, Map<Key, Condition>> toRemove) {
+        performLeaseCheck();
+        return delegate.remove(collection, toRemove);
+    }
+
+    @Override
+    public final <T extends Document> boolean create(Collection<T> collection,
+            List<UpdateOp> updateOps) {
+        performLeaseCheck();
+        return delegate.create(collection, updateOps);
+    }
+
+    @Override
+    public final <T extends Document> void update(Collection<T> collection,
+            List<String> keys, UpdateOp updateOp) {
+        performLeaseCheck();
+        delegate.update(collection, keys, updateOp);
+    }
+
+    @Override
+    public final <T extends Document> T createOrUpdate(Collection<T> collection,
+            UpdateOp update) {
+        performLeaseCheck();
+        return delegate.createOrUpdate(collection, update);
+    }
+
+    @Override
+    public final <T extends Document> T findAndUpdate(Collection<T> collection,
+            UpdateOp update) {
+        performLeaseCheck();
+        return delegate.findAndUpdate(collection, update);
+    }
+
+    @Override
+    public final CacheInvalidationStats invalidateCache() {
+        performLeaseCheck();
+        return delegate.invalidateCache();
+    }
+
+    @Override
+    public final CacheInvalidationStats invalidateCache(Iterable<String> keys) {
+        performLeaseCheck();
+        return delegate.invalidateCache(keys);
+    }
+
+    @Override
+    public final <T extends Document> void invalidateCache(Collection<T> collection,
+            String key) {
+        performLeaseCheck();
+        delegate.invalidateCache(collection, key);
+    }
+
+    @Override
+    public final void dispose() {
+        // this is debatable whether or not a lease check should be done on dispose.
+        // I'd say the lease must still be valid as on dispose there could be
+        // stuff written to the document store which should only be done
+        // when the lease is valid
+        performLeaseCheck();
+        delegate.dispose();
+    }
+
+    @Override
+    public final <T extends Document> T getIfCached(Collection<T> collection,
+            String key) {
+        performLeaseCheck();
+        return delegate.getIfCached(collection, key);
+    }
+
+    @Override
+    public final void setReadWriteMode(String readWriteMode) {
+        performLeaseCheck();
+        delegate.setReadWriteMode(readWriteMode);
+    }
+
+    @Override
+    public final CacheStats getCacheStats() {
+        performLeaseCheck();
+        return delegate.getCacheStats();
+    }
+
+    @Override
+    public final Map<String, String> getMetadata() {
+        performLeaseCheck();
+        return delegate.getMetadata();
+    }
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/LeaseCheckDocumentStoreWrapper.java
___________________________________________________________________
Added: svn:eol-style
   + native

Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java	(revision 1693866)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java	(working copy)
@@ -96,6 +96,7 @@
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.json.BlobSerializer;
 import org.apache.jackrabbit.oak.plugins.document.Branch.BranchCommit;
+import org.apache.jackrabbit.oak.plugins.document.util.LeaseCheckDocumentStoreWrapper;
 import org.apache.jackrabbit.oak.plugins.document.util.LoggingDocumentStoreWrapper;
 import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
 import org.apache.jackrabbit.oak.plugins.document.util.TimingDocumentStoreWrapper;
@@ -411,14 +412,13 @@
         if (builder.getLogging()) {
             s = new LoggingDocumentStoreWrapper(s);
         }
-        this.store = s;
         this.changes = Collection.JOURNAL.newDocument(s);
         this.executor = builder.getExecutor();
         this.clock = builder.getClock();
         int cid = builder.getClusterId();
         cid = Integer.getInteger("oak.documentMK.clusterId", cid);
         if (cid == 0) {
-            clusterNodeInfo = ClusterNodeInfo.getInstance(store);
+            clusterNodeInfo = ClusterNodeInfo.getInstance(s);
             // TODO we should ensure revisions generated from now on
             // are never "older" than revisions already in the repository for
             // this cluster id
@@ -426,6 +426,10 @@
         } else {
             clusterNodeInfo = null;
         }
+        if (builder.getLeaseCheck()) {
+            s = new LeaseCheckDocumentStoreWrapper(s, clusterNodeInfo);
+        }
+        this.store = s;
         this.clusterId = cid;
         this.revisionComparator = new Revision.RevisionComparator(clusterId);
         this.branches = new UnmergedBranches(getRevisionComparator());
