diff --git a/oak-core/pom.xml b/oak-core/pom.xml
index 7f9ec87..7175fc3 100644
--- a/oak-core/pom.xml
+++ b/oak-core/pom.xml
@@ -315,6 +315,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>junit-addons</groupId>
+      <artifactId>junit-addons</artifactId>
+      <version>1.4</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
       <version>1.9.5</version>
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
index afdefe5..5dfdda3 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -55,6 +56,7 @@ import com.google.common.io.Closer;
 
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.api.Descriptors;
 import org.apache.jackrabbit.oak.api.Root;
 import org.apache.jackrabbit.oak.api.jmx.QueryEngineSettingsMBean;
 import org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean;
@@ -97,9 +99,12 @@ import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
 import org.apache.jackrabbit.oak.spi.whiteboard.DefaultWhiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Tracker;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardAware;
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
+import org.apache.jackrabbit.oak.util.AggregatingDescriptors;
+import org.apache.jackrabbit.oak.util.GenericDescriptors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -611,13 +616,29 @@ public class Oak {
         regs.add(registerMBean(whiteboard, RepositoryManagementMBean.class, repositoryManager,
                 RepositoryManagementMBean.TYPE, repositoryManager.getName()));
 
+        final Tracker<Descriptors> t = whiteboard.track(Descriptors.class);
+        final List<Descriptors> descriptorServices = t.getServices();
+        Descriptors additionalDescriptors = null;
+        if (descriptorServices!=null && descriptorServices.size()>0) {
+            if (descriptorServices.size()==1) {
+                additionalDescriptors = descriptorServices.get(0);
+            } else {
+                AggregatingDescriptors aggregator = new AggregatingDescriptors();
+                for (Iterator<Descriptors> it = descriptorServices.iterator(); it.hasNext();) {
+                    aggregator.add(it.next());
+                }
+                additionalDescriptors = aggregator;
+            }
+        }
+
         return new ContentRepositoryImpl(
                 store,
                 CompositeHook.compose(commitHooks),
                 defaultWorkspaceName,
                 queryEngineSettings,
                 indexProvider,
-                securityProvider) {
+                securityProvider,
+                additionalDescriptors) {
             @Override
             public void close() throws IOException {
                 super.close();
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentRepositoryImpl.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentRepositoryImpl.java
index ef7a56d..69f9df5 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentRepositoryImpl.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentRepositoryImpl.java
@@ -116,6 +116,7 @@ public class ContentRepositoryImpl implements ContentRepository, Closeable {
     private final SecurityProvider securityProvider;
     private final QueryIndexProvider indexProvider;
     private final QueryEngineSettings queryEngineSettings;
+    private final Descriptors baseDescriptors;
 
     private GenericDescriptors descriptors;
     
@@ -134,13 +135,15 @@ public class ContentRepositoryImpl implements ContentRepository, Closeable {
                                  @Nonnull String defaultWorkspaceName,
                                  QueryEngineSettings queryEngineSettings,
                                  @Nullable QueryIndexProvider indexProvider,
-                                 @Nonnull SecurityProvider securityProvider) {
+                                 @Nonnull SecurityProvider securityProvider,
+                                 @Nullable Descriptors baseDescriptors) {
         this.nodeStore = checkNotNull(nodeStore);
         this.commitHook = checkNotNull(commitHook);
         this.defaultWorkspaceName = checkNotNull(defaultWorkspaceName);
         this.securityProvider = checkNotNull(securityProvider);
         this.queryEngineSettings = queryEngineSettings != null ? queryEngineSettings : new QueryEngineSettings();
         this.indexProvider = indexProvider != null ? indexProvider : new CompositeQueryIndexProvider();
+        this.baseDescriptors = baseDescriptors;
     }
 
     @Nonnull
@@ -185,7 +188,7 @@ public class ContentRepositoryImpl implements ContentRepository, Closeable {
         final Value trueValue = valueFactory.createValue(true);
         final Value falseValue = valueFactory.createValue(false);
 
-        GenericDescriptors gd = new GenericDescriptors()
+        GenericDescriptors gd = new GenericDescriptors(baseDescriptors)
                 .put(IDENTIFIER_STABILITY, valueFactory.createValue(Repository.IDENTIFIER_STABILITY_METHOD_DURATION), true, true)
                 .put(LEVEL_1_SUPPORTED, trueValue, true, true)
                 .put(LEVEL_2_SUPPORTED, trueValue, true, true)
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
index 5acebda..48112c4 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
@@ -61,6 +61,13 @@ public class ClusterNodeInfo {
     public static final String LEASE_END_KEY = "leaseEnd";
 
     /**
+     * The key for the root-revision of the last background write (of unsaved modifications)
+     * - that is: the last root-revision written by the instance in case of a clear shutdown
+     * or via recovery of another instance in case of a crash
+     */
+    public static final String LAST_WRITTEN_ROOT_REV_KEY = "lastWrittenRootRev";
+
+    /**
      * The state of the cluster. On proper shutdown the state should be cleared.
      *
      * @see org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfo.ClusterNodeState
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
index 969f437..5493f12 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
@@ -77,4 +77,9 @@ public class ClusterNodeInfoDocument extends Document {
     private RecoverLockState getRecoveryState(){
         return RecoverLockState.fromString((String) get(ClusterNodeInfo.REV_RECOVERY_LOCK));
     }
+
+    /** the root-revision of the last background write (of unsaved modifications) **/
+    public String getLastWrittenRootRev() {
+        return (String) get(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY);
+    }
 }
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java
new file mode 100644
index 0000000..e8a3ad6
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterStateChangeListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+/**
+ * DocumentNS-internal listener that gets invoked when a change in the 
+ * clusterNodes collection (active/inactive/timed out/recovering) is detected.
+ */
+public interface ClusterStateChangeListener {
+
+    /**
+     * Informs the listener that DocumentNodeStore has discovered a change in the
+     * clusterNodes collection.
+     */
+    public void handleClusterStateChange();
+    
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java
new file mode 100644
index 0000000..a815e02
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterView.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
+
+/**
+ * A ClusterView represents the state of a cluster at a particular 
+ * moment in time.
+ * <p>
+ * This is a combination of what is stored in the ClusterViewDocument
+ * and the list of instances that currently have a backlog.
+ * <p>
+ * In order to be able to differentiate and clearly identify the
+ * different states an instance is in, the ClusterView uses a 
+ * slightly different terminology of states that it reports:
+ * <ul>
+ *  <li>Active: (same as in the ClusterViewDocument) an instance that
+ *  is alive and has no recoveryLock set. Whether or not the lease
+ *  has timed out is ignored. If the lease would be timed out, this
+ *  would be immediately noticed by one of the instances and the 
+ *  affected instance would thus be recovered soon.</li>
+ *  <li>Deactivating: An instance that is either recovering
+ *  (which is the state reported from the ClusterViewDocument) -
+ *  ie it was active until now but the lease has just timed out
+ *  and one of the peer instances noticed so it does a recovery now -
+ *  or it is inactive but some of its changes are still in the backlog
+ *  (the latter is not tracked in the ClusterViewDocument, instead
+ *  instances with a backlog are in the 'inactive' bucket there).</li>
+ *  <li>Inactive: An instance that is both inactive from a 
+ *  clusterNodes/ClusterViewDocument point of view (ie no longer
+ *  active and already recovered) and it has no backlog anymore.
+ *  </li>
+ * </ul>
+ * The JSON generated by the ClusterView (which is propagated to JMX)
+ * has the following fields:
+ * <ul>
+ * <li>seq = sequence number: this is a monotonically increasing number
+ * assigned to each incarnation of the persisted clusterView (in the
+ * settings collection). It can be used to take note of the fact that
+ * a view has changed even though perhaps all activeIds are still the same
+ * (eg when the listener would have missed a few changes). It can also be
+ * used to tell with certainty that 'anything has changed' compared to
+ * the clusterView with a previous sequence number</li>
+ * <li>final = is final: this is a boolean indicating whether or not the
+ * view with a particular sequence number is final (not going to change
+ * anymore) or whether the discovery lite takes the freedom to modify
+ * the view in the future (false). So whenever 'final' is false, then
+ * the view must be treated as 'in flux' and perhaps the user should
+ * wait with doing any conclusions. That's not to say that if 'final' is
+ * false, that the information provided in active/deactivating/inactive
+ * is wrong - that's of course not the case - that info is always correct.
+ * But when 'final' is false it just means that active/deactivating/inactive
+ * for a given sequence number might change.</li>
+ * <li>id = cluster view id: this is the unique, stable identifier of the local cluster.
+ * The idea of this id is to provide both an actual identifier for the local 
+ * cluster as well as a 'namespace' for the instanceIds therein. The instanceIds
+ * are all just simply integers and can of course be the same for instances
+ * in different clusters.</li>
+ * <li>me = my local instance id: this is the id of the local instance 
+ * as managed by DocumentNodeStore</li>
+ * <li>active = active instance ids: this is the list of instance ids that are all
+ * currently active in the local cluster. The ids are managed by DocumentNodeStore</li>
+ * <li>deactivating = deactivating instance ids: this is the list of instance ids that
+ * are all in the process of deactivating and for which therefore some data
+ * might still be making its way to the local instance. So any changes that
+ * were done by instances that are deactivating might not yet be visible locally</li>
+ * <li>deactive = deactive instance ids: this is the list of instance ids that
+ * are not running nor do they have any data pending to become visible by the local
+ * instance</li>
+ * </ul>
+ */
+class ClusterView {
+
+    /** the json containing the complete information of the state of this ClusterView. 
+     * Created at constructor time for performance reasons (json will be polled via
+     * JMX very frequently, thus must be provided fast)
+     */
+    private final String json;
+
+    /**
+     * Factory method that creates a ClusterView given a ClusterViewDocument and a list
+     * of instances that currently have a backlog.
+     * <p>
+     * The ClusterViewDocument contains instances in the following states:
+     * <ul>
+     * <li>active</li>
+     * <li>recovering</li>
+     * <li>inactive</li>
+     * </ul>
+     * The ClusterView however reports these upwards as follows:
+     * <ul>
+     * <li>active: this is 1:1 the active ones from the ClusterViewDocument</li>
+     * <li>deactivating: this includes the recovering ones from the ClusterViewDocument plus those passed 
+     * to this method in the backlogIds parameter</li>
+     * <li>inactive: this is the inactive ones from the ClusterViewDocument <b>minus</li> the backlogIds passed</li>
+     * </ul>
+     * @param localInstanceId the id of the local instance (me)
+     * @param clusterViewDoc the ClusterViewDocument which contains the currently persisted cluster view
+     * @param backlogIds the ids that the local instances still has not finished a background read for
+     * and thus still have a backlog
+     * @return the ClusterView representing the provided info
+     */
+    static ClusterView fromDocument(int localInstanceId, ClusterViewDocument clusterViewDoc, Set<Integer> backlogIds) {
+        final Set<Integer> activeIds = clusterViewDoc.getActiveIds();
+        final Set<Integer> deactivatingIds = new HashSet<Integer>();
+        deactivatingIds.addAll(clusterViewDoc.getRecoveringIds());
+        deactivatingIds.addAll(backlogIds);
+        final Set<Integer> inactiveIds = new HashSet<Integer>();
+        inactiveIds.addAll(clusterViewDoc.getInactiveIds());
+        if (!inactiveIds.removeAll(backlogIds) && backlogIds.size()>0) {
+            // then not all backlogIds were listed is inactive - which is contrary to the expectation
+            // in which case we indeed do a paranoia exception here:
+            throw new IllegalStateException("not all backlogIds ("+backlogIds+") are part of inactiveIds ("+clusterViewDoc.getInactiveIds()+")");
+        }
+        return new ClusterView(clusterViewDoc.getViewSeqNum(), backlogIds.size()==0, clusterViewDoc.getClusterViewId(), localInstanceId,
+                activeIds, deactivatingIds, inactiveIds);
+    }
+
+    ClusterView(final long viewSeqNum,
+            final boolean viewFinal,
+            final String clusterViewId, 
+            final int localId, 
+            final Set<Integer> activeIds,
+            final Set<Integer> deactivatingIds,
+            final Set<Integer> inactiveIds) {
+        if (viewSeqNum<0) {
+            throw new IllegalStateException("viewSeqNum must be zero or higher: "+viewSeqNum);
+        }
+        if (clusterViewId==null || clusterViewId.length()==0) {
+            throw new IllegalStateException("clusterViewId must not be zero or empty: "+clusterViewId);
+        }
+        if (localId<0) {
+            throw new IllegalStateException("localId must not be zero or higher: "+localId);
+        }
+        if (activeIds==null || activeIds.size()==0) {
+            throw new IllegalStateException("activeIds must not be null or empty");
+        }
+        if (deactivatingIds==null) {
+            throw new IllegalStateException("deactivatingIds must not be null");
+        }
+        if (inactiveIds==null) {
+            throw new IllegalStateException("inactiveIds must not be null");
+        }
+
+        json = asJson(viewSeqNum, viewFinal, clusterViewId, localId, activeIds, deactivatingIds, inactiveIds);
+    }
+
+    /** Converts the provided parameters into the clusterview json that will be provided via JMX **/
+    private String asJson(final long viewSeqNum, final boolean viewFinal, final String clusterViewId, final int localId, 
+            final Set<Integer> activeIds,
+            final Set<Integer> deactivatingIds,
+            final Set<Integer> inactiveIds) {
+        JsopBuilder builder = new JsopBuilder();
+        builder.object();
+        builder.key("seq").value(viewSeqNum);
+        builder.key("final").value(viewFinal);
+        builder.key("id").value(clusterViewId);
+        builder.key("me").value(localId);
+        builder.key("active").array();
+        for (Iterator<Integer> it = activeIds.iterator(); it
+                .hasNext();) {
+            Integer anInstance = it.next();
+            builder.value(anInstance);
+        }
+        builder.endArray();
+        builder.key("deactivating").array();
+        for (Iterator<Integer> it = deactivatingIds.iterator(); it
+                .hasNext();) {
+            Integer anInstance = it.next();
+            builder.value(anInstance);
+        }
+        builder.endArray();
+        builder.key("inactive").array();
+        for (Iterator<Integer> it = inactiveIds.iterator(); it
+                .hasNext();) {
+            Integer anInstance = it.next();
+            builder.value(anInstance);
+        }
+        builder.endArray();
+        builder.endObject();
+        return builder.toString();
+    }
+    
+    /** Debugging toString() **/
+    @Override
+    public String toString() {
+        return "a ClusterView["+json+"]";
+    }
+    
+    /** This is the main getter that will be polled via JMX **/ 
+    String asDescriptorValue() {
+        return json;
+    }
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java
new file mode 100644
index 0000000..21d5b9a
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the document stored in the settings collection containing
+ * a 'cluster view'.
+ * <p>
+ * A 'cluster view' is the state of the membership of instances that are
+ * or have all been connected to the same oak repository.
+ * The 'cluster view' is maintained by all instances in the cluster
+ * concurrently - the faster one wins. Its information is derived
+ * from the clusterNodes collection. From there the following three
+ * states are derived and instances are grouped into these:
+ * <ul>
+ *  <li>Active: an instance is active and has no recoveryLock is currently
+ *  acquired. The lease timeout is ignored. When the lease times out, this
+ *  is noticed by one of the instances at some point and a recovery
+ *  is started, at which point the instance transitions from 'Active' 
+ *  to 'Recovering'.</li>
+ *  <li>Recovering: an instance that was active but currently has the
+ *  recoveryLock acquired by one of the instances.</li>
+ *  <li>Inactive: an instance is not set to active (in which case
+ *  the recoveryLock is never set)</li>
+ * </ul>
+ * <p>
+ * Note that the states managed in this ClusterViewDocument differs
+ * from the one from ClusterView - since ClusterView also manages
+ * the fact that after a recovery of a crashed instance there could
+ * be a 'backlog' of changes which it doesn't yet see until a
+ * background read is performed.
+ */
+class ClusterViewDocument {
+    
+    private static final Logger logger = LoggerFactory.getLogger(ClusterViewDocument.class);
+
+    /** the id of this document is always 'clusterView' **/
+    private static final String CLUSTERVIEW_DOC_ID = "clusterView";
+    
+    // keys that we store in the root document - and in the history
+    /** document key that stores the stable id of the cluster (will never change) 
+     * (Note: a better term would have been just clusterId - 
+     * but that one is already occupied with what should actually be called clusterNodeId or just nodeId) **/
+    static final String CLUSTER_VIEW_ID_KEY = "clusterViewId";
+    
+    /** document key that stores the monotonically incrementing sequence number of the cluster view. Any update will increase this by 1 **/
+    static final String VIEW_SEQ_NUM_KEY = "seqNum";
+    
+    /** document key that stores the comma-separated list of active instance ids **/
+    static final String ACTIVE_KEY = "active";
+    
+    /** document key that stores the comma-separated list of inactive instance ids 
+     * (they might still have a backlog, that is handled in ClusterView though, never persisted */
+    static final String INACTIVE_KEY = "inactive";
+    
+    /** document key that stores the comma-separated list of recovering instance ids **/
+    static final String RECOVERING_KEY = "recovering";
+    
+    /** document key that stores the date and time when this view was created - for debugging purpose only **/
+    private static final String CREATED_KEY = "created";
+    
+    /** document key that stores the id of the instance that created this view - for debugging purpose only **/
+    private static final String CREATOR_KEY = "creator";
+    
+    /** document key that stores the date and time when this was was retired - for debugging purpose only **/
+    private static final String RETIRED_KEY = "retired";
+    
+    /** document key that stores the id of the instance that retired this view - for debugging purpose only **/
+    private static final String RETIRER_KEY = "retirer";
+    
+    /** document key that stores a short, limited history of previous cluster views - for debugging purpose only **/
+    private static final String CLUSTER_VIEW_HISTORY_KEY = "clusterViewHistory";
+    
+    /** the format used when storing date+time **/
+    private static final DateFormat standardDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+    /** number of elements kept in the CLUSTERVIEW_HISTORY_KEY field **/
+    private static final int HISTORY_LIMIT = 10; 
+
+    /** the monotonically incrementing sequence number of this cluster view **/
+    private final long viewSeqNum;
+
+    /** the stable id of this cluster **/
+    private final String clusterViewId;
+
+    /** the ids of instances that are active at this moment **/
+    private final Integer[] activeIds;
+    
+    /** the ids of instances that are recovering (lastRev-recovery) at this moment **/
+    private final Integer[] recoveringIds;
+
+    /** the ids of instances that are inactive at this moment **/
+    private final Integer[] inactiveIds;
+
+    /** the short, limited history of previous cluster views, for debugging only **/
+    private final Map<Object, String> viewHistory;
+
+    /** the date+time at which this view was created, for debugging only **/
+    private final String createdAt;
+
+    /** the id of the instance that created this view, for debugging only **/
+    private final Integer createdBy;
+
+    /**
+     * Main method by which the ClusterViewDocument is updated in the settings collection
+     * @return the resulting ClusterViewDocument as just updated in the settings collection - or null
+     * if another instance was updating the clusterview concurrently (in which case the caller
+     * should re-read first and possibly re-update if needed)
+     */
+    static ClusterViewDocument readOrUpdate(DocumentNodeStore documentNodeStore,
+            Set<Integer> activeIds, Set<Integer> recoveringIds, Set<Integer> inactiveIds) {
+        logger.trace("readOrUpdate: expected: activeIds: {}, recoveringIds: {}, inactiveIds: {}",
+                activeIds, recoveringIds, inactiveIds);
+        if (activeIds==null || activeIds.size()==0) {
+            logger.info("readOrUpdate: activeIds must not be null or empty");
+            throw new IllegalStateException("activeIds must not be null or empty");
+        }
+        final int localClusterId = documentNodeStore.getClusterId();
+
+        final ClusterViewDocument previousView = doRead(documentNodeStore);
+        if (previousView!=null) {
+            if (previousView.matches(activeIds, recoveringIds, inactiveIds)) {
+                logger.trace("readOrUpdate: view unchanged, returning: {}", previousView);
+                return previousView;
+            }
+        }
+        logger.trace("readOrUpdate: view change detected, going to update from {} to activeIds: {}, recoveringIds: {}, inactiveIds: {}", 
+                previousView, activeIds, recoveringIds, inactiveIds);
+        final UpdateOp updateOp = new UpdateOp(CLUSTERVIEW_DOC_ID, true);
+        final Date now = new Date();
+        updateOp.set(ACTIVE_KEY, setToCsv(activeIds));
+        updateOp.set(RECOVERING_KEY, setToCsv(recoveringIds));
+        updateOp.set(INACTIVE_KEY, setToCsv(inactiveIds));
+        updateOp.set(CREATED_KEY, standardDateFormat.format(now));
+        updateOp.set(CREATOR_KEY, localClusterId);
+        Map<Object,String> historyMap = new HashMap<Object,String>();
+        if (previousView!=null) {
+            Map<Object, String> previousHistory = previousView.getHistory();
+            if (previousHistory!=null) {
+                historyMap.putAll(previousHistory);
+            }
+            
+            historyMap.put(Revision.newRevision(localClusterId).toString(), 
+                    asHistoryEntry(previousView, localClusterId, now));
+        }
+        applyHistoryLimit(historyMap);
+        updateOp.set(CLUSTER_VIEW_HISTORY_KEY, historyMap);
+        
+        final Long newViewSeqNum;
+        if (previousView==null) {
+            // we are the first ever, looks like, that the clusterview is defined
+            // so we can use viewId==1 and we make sure no other cluster node
+            // tries to create this first one simultaneously - so we use
+            // 'create'
+            
+            // going via 'create' requires ID to be set again (not only in new UpdateOp(id,isNew)):
+            updateOp.set(Document.ID, CLUSTERVIEW_DOC_ID);
+            final ArrayList<UpdateOp> updateOps = new ArrayList<UpdateOp>();
+            newViewSeqNum=1L;
+            updateOp.setNew(true); // paranoia as that's already set above
+            updateOp.set(VIEW_SEQ_NUM_KEY, newViewSeqNum);
+            // first view ever => choose a new unique clusterViewId
+            final String clusterViewId = UUID.randomUUID().toString();
+            updateOp.set(CLUSTER_VIEW_ID_KEY, clusterViewId);
+            updateOps.add(updateOp);
+            logger.debug("updateAndRead: trying to create the first ever clusterView - hence {}={} and {}={}", VIEW_SEQ_NUM_KEY, newViewSeqNum, CLUSTER_VIEW_ID_KEY, clusterViewId);
+            if (!documentNodeStore.getDocumentStore().create(Collection.SETTINGS, updateOps)) {
+                logger.debug("updateAndRead: someone else just created the first view ever while I tried - reread that one later");
+                return null;
+            }
+        } else {
+            // there were earlier clusterViews (the normal case) - thus we 
+            // use 'findAndUpdate' with the condition that 
+            // the view id is still at the previousview one
+            final Long previousViewSeqNum = previousView.getViewSeqNum();
+            updateOp.setNew(false); // change to false from true above
+            updateOp.equals(VIEW_SEQ_NUM_KEY, null, previousViewSeqNum);
+            newViewSeqNum = previousViewSeqNum+1;
+            updateOp.set(VIEW_SEQ_NUM_KEY, newViewSeqNum);
+            logger.debug("updateAndRead: trying to update the clusterView to {}={} ",VIEW_SEQ_NUM_KEY,newViewSeqNum);
+            if (documentNodeStore.getDocumentStore().findAndUpdate(Collection.SETTINGS, updateOp)==null) {
+                logger.debug("updateAndRead: someone else just updated the view which I wanted to do as well - reread that one later");
+                return null;
+            }
+        }
+        
+        // whatever the outcome of the above - we don't care -
+        // re-reading will in any case definitely show what has been persisted
+        // and if the re-read view contains the same id, it is what we have written
+        // - otherwise someone else came in between and we have to step back and retry
+        final ClusterViewDocument readResult = doRead(documentNodeStore);
+        if (readResult==null) {
+            logger.debug("updateAndRead: got null from read - whatever the exact reason, we must retry in a moment.");
+            return null;
+        } else if (newViewSeqNum.equals(readResult.getViewSeqNum())) {
+            logger.debug("updateAndRead: matching view - no change");
+            return readResult;
+        } else {
+            logger.debug("updateAndRead: someone else in the cluster was updating right after I also succeeded - re-read in a bit");
+            return null;
+        }
+    }
+
+    /**
+     * Pruning method that makes sure the history never gets larger than HISTORY_LIMIT
+     * @param historyMap the pruning is done directly on this map
+     */
+    private static void applyHistoryLimit(Map<Object, String> historyMap) {
+        while (historyMap.size()>HISTORY_LIMIT) {
+            // remove the oldest
+            String oldestRevision = null;
+            for (Iterator<Object> it = historyMap.keySet().iterator(); it.hasNext();) {
+                final Object obj = it.next();
+                String r;
+                if (obj instanceof Revision) {
+                    r = ((Revision)obj).toString();
+                } else {
+                    r = (String) obj;
+                }
+                if (oldestRevision==null) {
+                    oldestRevision = r;
+                } else if (Revision.getTimestampDifference(Revision.fromString(r), Revision.fromString(oldestRevision))<0) {
+                    oldestRevision = r;
+                }
+            }
+            if (oldestRevision==null) {
+                break;
+            } else {
+                if (historyMap.remove(oldestRevision)==null) {
+                    if (historyMap.remove(Revision.fromString(oldestRevision))==null) {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    /** Converts a previous clusterView document into a history 'string' **/
+    private static String asHistoryEntry(final ClusterViewDocument previousView, int retiringClusterNodeId, Date retireTime) {
+        String h;
+        JsopBuilder b = new JsopBuilder();
+        b.object();
+        b.key(VIEW_SEQ_NUM_KEY);
+        b.value(previousView.getViewSeqNum());
+        b.key(CREATED_KEY);
+        b.value(String.valueOf(previousView.getCreatedAt()));
+        b.key(CREATOR_KEY);
+        b.value(previousView.getCreatedBy());
+        b.key(RETIRED_KEY);
+        b.value(String.valueOf(standardDateFormat.format(retireTime)));
+        b.key(RETIRER_KEY);
+        b.value(retiringClusterNodeId);
+        b.key(ACTIVE_KEY);
+        b.value(arrayToCsv((Integer[])previousView.getActiveIds().toArray(new Integer[0])));
+        b.key(RECOVERING_KEY);
+        b.value(arrayToCsv((Integer[])previousView.getRecoveringIds().toArray(new Integer[0])));
+        b.key(INACTIVE_KEY);
+        b.value(arrayToCsv((Integer[])previousView.getInactiveIds().toArray(new Integer[0])));
+        b.endObject();
+        h = b.toString();
+        return h;
+    }
+    
+    /** helper method to convert a set to a comma-separated string (without using toString() for safety) **/
+    private static String setToCsv(Set<Integer> ids) {
+        if (ids==null || ids.size()==0) {
+            return null;
+        }
+        StringBuffer sb = new StringBuffer();
+        Iterator<Integer> it = ids.iterator();
+        while(it.hasNext()) {
+            final Integer id = it.next();
+            if (sb.length()!=0) {
+                sb.append(",");
+            }
+            sb.append(id);
+        }
+        return sb.toString();
+    }
+    
+    /** helper method to convert an array to a comma-separated string **/
+    static String arrayToCsv(Integer[] arr) {
+        if (arr==null || arr.length==0) {
+            return null;
+        } else if (arr.length==1) {
+            return String.valueOf(arr[0]);
+        }
+        StringBuffer sb = new StringBuffer();
+        sb.append(String.valueOf(arr[0]));
+        for (int i = 1; i < arr.length; i++) {
+            final Object a = arr[i];
+            sb.append(",");
+            sb.append(String.valueOf(a));
+        }
+        return sb.toString();
+    }
+    
+    /** inverse helper method which converts a comma-separated string into an integer array **/
+    static Integer[] csvToIntegerArray(String csv) {
+        if (csv==null) {
+            return null;
+        }
+        String[] split = csv.split(",");
+        Integer[] result = new Integer[split.length];
+        for (int i = 0; i < split.length; i++) {
+            result[i] = Integer.parseInt(split[i]);
+        }
+        return result;
+    }
+    
+    /** internal reader of an existing clusterView document from the settings collection **/
+    private static ClusterViewDocument doRead(DocumentNodeStore documentNodeStore) {
+        final DocumentStore documentStore = documentNodeStore.getDocumentStore();
+        final Document doc = documentStore.find(Collection.SETTINGS, "clusterView", -1 /* -1; avoid caching */);
+        if (doc==null) {
+            return null;
+        } else {
+            final ClusterViewDocument clusterView = new ClusterViewDocument(doc);
+            if (clusterView.isValid()) {
+                return clusterView;
+            } else {
+                logger.warn("read: clusterView document is not valid: "+doc.format());
+                return null;
+            }
+        }
+    }
+
+    /** comparison helper that compares an integer array with a set **/
+    static boolean matches(Integer[] expected, Set<Integer> actual) {
+        final boolean expectedIsEmpty = expected==null || expected.length==0;
+        final boolean actualIsEmpty = actual==null || actual.size()==0;
+        if (expectedIsEmpty && actualIsEmpty) {
+            // if both are null or empty, they match
+            return true;
+        }
+        if (expectedIsEmpty!=actualIsEmpty) {
+            // if one of them is only empty, but the other not, then they don't match
+            return false;
+        }
+        if (expected.length!=actual.size()) {
+            // different size
+            return false;
+        }
+        for (int i = 0; i < expected.length; i++) {
+            Integer aMemberId = expected[i];
+            if (!actual.contains(aMemberId)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @SuppressWarnings("unchecked")
+    ClusterViewDocument(Document doc) {
+        if (doc==null) {
+            throw new IllegalArgumentException("doc must not be null");
+        }
+        this.clusterViewId = (String) doc.get(CLUSTER_VIEW_ID_KEY);
+        this.viewSeqNum = (Long) doc.get(VIEW_SEQ_NUM_KEY);
+        this.createdAt = (String) doc.get(CREATED_KEY);
+        this.createdBy = (Integer) doc.get(CREATOR_KEY);
+        
+        final Object obj = doc.get(ACTIVE_KEY);
+        if (obj==null || !(obj instanceof String)) {
+            logger.trace("<init>: {} : {}", ACTIVE_KEY, obj);
+            this.activeIds = new Integer[0];
+        } else {
+            this.activeIds = csvToIntegerArray((String) obj);
+        }
+        
+        final Object obj2 = doc.get(RECOVERING_KEY);
+        if (obj2==null || !(obj2 instanceof String)) {
+            logger.trace("<init>: {} : {}", RECOVERING_KEY, obj2);
+            this.recoveringIds= new Integer[0];
+        } else {
+            this.recoveringIds = csvToIntegerArray((String) obj2);
+        }
+
+        final Object obj3 = doc.get(INACTIVE_KEY);
+        if (obj3==null || !(obj3 instanceof String)) {
+            logger.trace("<init>: {} : {}", INACTIVE_KEY, obj3);
+            this.inactiveIds = new Integer[0];
+        } else {
+            this.inactiveIds = csvToIntegerArray((String) obj3);
+        }
+
+        final Object obj4 = doc.get(CLUSTER_VIEW_HISTORY_KEY);
+        if (obj4==null || !(obj4 instanceof Map)) {
+            logger.trace("<init> viewHistory is null");
+            this.viewHistory = null;
+        } else {
+            this.viewHistory = ((Map<Object,String>)obj4);
+        }
+    }
+    
+    /** Returns the set of active ids of this cluster view **/
+    Set<Integer> getActiveIds() {
+        return new HashSet<Integer>(Arrays.asList(activeIds));
+    }
+
+    /** Returns the set of recovering ids of this cluster view **/
+    Set<Integer> getRecoveringIds() {
+        return new HashSet<Integer>(Arrays.asList(recoveringIds));
+    }
+
+    /** Returns the set of inactive ids of this cluster view **/
+    Set<Integer> getInactiveIds() {
+        return new HashSet<Integer>(Arrays.asList(inactiveIds));
+    }
+
+    /** Returns the history map **/
+    private Map<Object,String> getHistory() {
+        return viewHistory;
+    }
+
+    @Override
+    public String toString() {
+        return "a ClusterView[valid="+isValid()+", viewSeqNum="+viewSeqNum+", clusterViewId="+clusterViewId+
+                ", activeIds="+arrayToCsv(activeIds)+", recoveringIds="+arrayToCsv(recoveringIds)+
+                ", inactiveIds="+arrayToCsv(inactiveIds)+"]";
+    }
+    
+    boolean isValid() {
+        return viewSeqNum>=0 && activeIds!=null && activeIds.length>0;
+    }
+    
+    /** Returns the date+time when this view was created, for debugging purpose only **/
+    String getCreatedAt() {
+        return createdAt;
+    }
+    
+    /** Returns the id of the instance that created this view, for debugging purpose only **/
+    int getCreatedBy() {
+        return createdBy;
+    }
+    
+    /** Returns the monotonically incrementing sequenece number of this view **/
+    long getViewSeqNum() {
+        return viewSeqNum;
+    }
+    
+    /** Returns a UUID representing this cluster - will never change, propagates from view to view **/
+    String getClusterViewId() {
+        return clusterViewId;
+    }
+    
+    private boolean matches(Set<Integer> activeIds, Set<Integer> recoveringIds, Set<Integer> inactiveIds) {
+        if (!matches(this.activeIds, activeIds)) {
+            return false;
+        }
+        if (!matches(this.recoveringIds, recoveringIds)) {
+            return false;
+        }
+        if (!matches(this.inactiveIds, inactiveIds)) {
+            return false;
+        }
+        return true;
+    }
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java
new file mode 100644
index 0000000..a0cb398
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteService.java
@@ -0,0 +1,615 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import javax.jcr.Value;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.jackrabbit.commons.SimpleValueFactory;
+import org.apache.jackrabbit.oak.api.Descriptors;
+import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The DocumentDiscoveryLiteService is taking care of providing a
+ * repository descriptor that contains the current cluster-view details.
+ * <p>
+ * The clusterView is provided via a repository descriptor (see
+ * OAK_DISCOVERYLITE_CLUSTERVIEW)
+ * <p>
+ * The cluster-view lists all instances (ever) known in the cluster
+ * in one of the following states:
+ * <ul>
+ * <li>active: the instance is currently running and has an up-to-date
+ * lease</li>
+ * <li>deactivating: the instance failed to update the lease recently
+ * thus a recovery is happening - or it has just finished and the local
+ * instance is yet to do a backgroundRead before it has finished
+ * reading the crashed/shutdown instance's last changes</li>
+ * <li>inactive: the instance is currently not running and all its
+ * changes have been seen by the local instance</li>
+ * </ul>
+ * <p>
+ * Additionally, the cluster-view is assigned a monotonically increasing
+ * sequence number to. This sequence number is persisted, thus all
+ * instances in the cluster will show the same sequence number for 
+ * a particular cluster-view in time.
+ * <p>
+ * Note that the 'deactivating' state might be hiding some complexity
+ * that is deliberately not shown: for the documentNS the state
+ * 'deactivating' consists of two substates: 'recovering' as in
+ * _lastRevs are updated, and 'backlog processing' for a pending
+ * backgroundRead to get the latest head state of a crashed/shutdown
+ * instance. So when an instance is in 'deactivating' state, it is
+ * not indicated via the cluster-view whether it is recovering or
+ * has backlog to process. However, the fact that an instance has
+ * to yet do a backgroundRead to get changes is a per-instance
+ * story: other instances might already have done the backgroundRead
+ * and thus no longer have a backlog for the instance(s) that left.
+ * Even though 'deactivating' therefore is dependent on the instance
+ * you get the information from, the cluster-view must have a 
+ * sequence number that uniquely identifies it in the cluster. 
+ * These two constraints conflict. As a simple solution to handle
+ * this case nevertheless, the 'final' flag has been introduced:
+ * the cluster-view has this flag 'final' set to true when the
+ * view is final and nothing will be changed in this sequence number
+ * anymore. If the 'final' flag is false however it indicates
+ * that the cluster-view with this particular sequence number might
+ * still experience a change (more concretely: the deactivating instances
+ * might change). Note that there alternatives to this 'final' flag
+ * have been discussed, such as using vector-counters, but there
+ * was no obvious gain achieve using an alternative approach.
+ * <p>
+ * In other words: whenever the 'final' flag is false, the view
+ * must be interpreted as 'in flux' wrt the deactivating/inactive instances
+ * and any action that depends on stable deactivating/inactive instances
+ * must not yet be done until the 'final' flag becomes true.
+ * <p>
+ * Underneath, the DocumentDiscoveryLiteService uses the clusterNodes
+ * collection to derive the clusterView, which it stores in the settings
+ * collection. Whenever it updates the clusterView it increments the
+ * sequence number by 1.
+ * <p>
+ * While this new 'clusterView' document in the settings collection
+ * sounds like redundant data (since it is just derived from the clusterNodes),
+ * it actually is not. By persisting the clusterView it becomes the
+ * new source of truth wrt what the clusterView looks like. And no
+ * two instances in the same cluster can make different conclusions
+ * based eg on different clocks they have or based on reading the clusterNodes
+ * in a slightly different moment etc. Also, the clusterView allows
+ * to store the currently two additional attributes: the clusterViewId
+ * (which is the permanent id for this cluster similar to the slingId being
+ * a permanent id for an instance) as well as the sequence number 
+ * (which allows the instances to make reference to the same clusterView,
+ * and be able to simply detect whether anything has changed)
+ * <p>
+ * Prerequisites that the clusterView mechanism is stable:
+ * <ul>
+ *  <li>the machine clocks are reasonably in sync - that is, they should
+ *  be off by magnitudes less than the lease updateFrequency/timeout</li>
+ *  <li>the write-delays from any instance to the mongo server where
+ *  the clusterNodes and settings collections are stored should be
+ *  very fast - at least orders of magnitudes lower again than the
+ *  lease timeout</li>
+ *  <li>when this instance notices that others have kicked it out of the
+ *  clusterView (which it can find out when either its clusterNodes
+ *  document is set to recovering or it is not in the clusterView
+ *  anymore, although it just was - ie not just because of a fresh start),
+ *  then this instance must step back gracefully. 
+ *  The exact definition is to be applied elsewhere - but it should
+ *  include: stopping to update its own lease, waiting for the view
+ *  to have stabilized - waiting for recovery of its own instance
+ *  by the remaining instances in the cluster to have finished -
+ *  and then probably waiting for another gracePeriod until it
+ *  might rejoin the cluster. In between, any commit should
+ *  fail with BannedFromClusterException</li>
+ * </ul>
+ * @see #OAK_DISCOVERYLITE_CLUSTERVIEW
+ */
+@Component(immediate = true, name = DocumentDiscoveryLiteService.COMPONENT_NAME)
+@Service(value = { DocumentDiscoveryLiteService.class, Observer.class })
+public class DocumentDiscoveryLiteService implements ClusterStateChangeListener, Observer {
+
+    static final String COMPONENT_NAME = "org.apache.jackrabbit.oak.plugins.document.DocumentDiscoveryLiteService";
+    
+    /** Name of the repository descriptor via which the clusterView is published - which is the raison d'etre of the DocumentDiscoveryLiteService **/
+    public static final String OAK_DISCOVERYLITE_CLUSTERVIEW = "oak.discoverylite.clusterview";
+
+    private static final Logger logger = LoggerFactory.getLogger(DocumentDiscoveryLiteService.class);
+    
+    /** describes the reason why the BackgroundWorker should be woken up **/
+    private static enum WakeupReason {
+        CLUSTER_STATE_CHANGED,
+        BACKGROUND_READ_FINISHED
+    }
+    
+    /** The BackgroundWorker is taking care of regularly invoking checkView - which in turn will detect if anything changed **/
+    private class BackgroundWorker implements Runnable {
+
+        final Random random = new Random();
+        
+        boolean stopped = false;
+        
+        private void stop() {
+            logger.trace("stop: start");
+            synchronized(BackgroundWorker.this) {
+                stopped = true;
+            }
+            logger.trace("stop: end");
+        }
+        
+        @Override
+        public void run() {
+            logger.info("BackgroundWorker.run: start");
+            try{
+                doRun();
+            } finally {
+                logger.info("BackgroundWorker.run: end {finally}");
+            }
+        }
+
+        private void doRun() {
+            while(!stopped) {
+                try{
+                    logger.trace("BackgroundWorker.doRun: going to call checkView");
+                    final boolean shortSleep = checkView();
+                    logger.trace("BackgroundWorker.doRun: checkView terminated with {} (=shortSleep)", shortSleep);
+                    final long sleepMillis = shortSleep ? (50 + random.nextInt(450)) : 5000;
+                    logger.trace("BackgroundWorker.doRun: sleeping {}ms", sleepMillis);
+                    synchronized(BackgroundWorker.this) {
+                        if (stopped) return;
+                        BackgroundWorker.this.wait(sleepMillis);
+                        if (stopped) return;
+                    }
+                    logger.trace("BackgorundWorker.doRun: done sleeping, looping");
+                } catch(Exception e) {
+                    logger.error("doRun: got an exception: "+e, e);
+                    try{
+                        Thread.sleep(5000);
+                    } catch(Exception e2) {
+                        logger.error("doRun: got an exception while sleeping due to another exception: "+e2, e2);
+                    }
+                }
+            }
+        }
+        
+    }
+    
+    /** This provides the 'clusterView' repository descriptors **/
+    private class DiscoveryLiteDescriptor implements Descriptors {
+
+        final SimpleValueFactory factory = new SimpleValueFactory();
+        
+        @Override
+        public String[] getKeys() {
+            return new String[] {OAK_DISCOVERYLITE_CLUSTERVIEW};
+        }
+
+        @Override
+        public boolean isStandardDescriptor(String key) {
+            if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public boolean isSingleValueDescriptor(String key) {
+            if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+                return false;
+            }
+            return true;
+        }
+
+        @Override
+        public Value getValue(String key) {
+            if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+                return null;
+            }
+            return factory.createValue(getClusterViewAsDescriptorValue());
+        }
+
+        @Override
+        public Value[] getValues(String key) {
+            if (!OAK_DISCOVERYLITE_CLUSTERVIEW.equals(key)) {
+                return null;
+            }
+            return new Value[] {getValue(key)};
+        }
+        
+    }
+    
+    /** DocumentNodeStore's (hence local) clusterId **/
+    private int clusterNodeId = -1;
+
+    /** the DocumentNodeStore - used to get the active/inactive cluster ids from **/
+    private DocumentNodeStore documentNodeStore;
+
+    /** background job that periodically verifies and updates the clusterView **/
+    private BackgroundWorker backgroundWorker;
+
+    /** the ClusterViewDocument which was used in the last checkView run **/
+    private ClusterViewDocument previousClusterViewDocument;
+    
+    /** the ClusterView that was valid as a result of the previous checkView run **/
+    private ClusterView previousClusterView;
+    
+    /** kept volatile as this is frequently read in contentChanged which is better kept unsynchronized as long as possible **/
+    private volatile boolean hasInstancesWithBacklog;
+    
+    /** Require a static reference to the NodeStore. Note that this implies the service is active for both segment and document **/
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY,
+            policy = ReferencePolicy.STATIC)
+    private volatile NodeStore nodeStore;
+
+    /** inactive nodes that have been so for a while, ie they have no backlog anymore, so no need to check for backlog every time **/
+    private Set<Integer> longTimeInactives = new HashSet<Integer>();
+
+    /** returns the clusterView as a json value for it to be provided via the repository descriptor **/
+    private String getClusterViewAsDescriptorValue() {
+        if (previousClusterView==null) {
+            return null;
+        } else {
+            return previousClusterView.asDescriptorValue();
+        }
+    }
+
+    /** On activate the MongoDiscoveryService tries to start the background job */
+    @Activate
+    public void activate(ComponentContext context) {
+        logger.trace("activate: start");
+
+        // Document vs Segment check
+        if (!(nodeStore instanceof DocumentNodeStore)) {
+            // that is the case when running on SegmentNodeStore - and atm
+            // there's no other simple dependency that could guarantee that 
+            // DocumentDiscoveryLiteService is only activated when we are
+            // indeed running on DocumentNodeStore.
+            // So that's why there's this instanceof check.
+            // and if it fails, then disable the Document-discoveryliteservice
+            logger.info("activate: nodeStore is not a DocumentNodeStore, thus disabling: "+COMPONENT_NAME);
+            context.disableComponent(COMPONENT_NAME);
+            return;
+        }
+
+        // set the ClusterStateChangeListener with the DocumentNodeStore
+        this.documentNodeStore = (DocumentNodeStore) nodeStore;
+        documentNodeStore.setClusterStateChangeListener(this);
+        
+        // retrieve the clusterId
+        clusterNodeId = documentNodeStore.getClusterId();
+        
+        // start the background worker
+        backgroundWorker = new BackgroundWorker();
+        Thread th = new Thread(backgroundWorker, "DiscoveryLite-BackgroundWorker-["+clusterNodeId+"]");
+        th.setDaemon(true);
+        th.start();
+        
+        // register the Descriptors - for Oak to pass it upwards
+        if (context!=null) {
+            OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
+            whiteboard.register(Descriptors.class, new DiscoveryLiteDescriptor(), Collections.emptyMap());
+        }
+        logger.trace("activate: end");
+    }
+    
+    /** On deactivate the background job is stopped - if it was running at all **/
+    @Deactivate
+    protected void deactivate() {
+        logger.trace("deactivate: deactivated");
+
+        if (backgroundWorker!=null) {
+            backgroundWorker.stop();
+            backgroundWorker = null;
+        }
+        logger.trace("deactivate: end");
+    }
+    
+    /**
+     * Checks if anything changed in the current view and updates the service
+     * fields accordingly.
+     * @return true if anything changed or is about to be changed (eg recovery/backlog), false if 
+     * the view is stable
+     */
+    private boolean checkView() {
+        logger.trace("checkView: start");
+        List<ClusterNodeInfoDocument> allClusterNodes = 
+                ClusterNodeInfoDocument.all(documentNodeStore.getDocumentStore());
+        
+        final Map<Integer,ClusterNodeInfoDocument> allNodeIds = new HashMap<Integer,ClusterNodeInfoDocument>();
+        final Map<Integer,ClusterNodeInfoDocument> activeNotTimedOutNodes = new HashMap<Integer,ClusterNodeInfoDocument>();
+        final Map<Integer,ClusterNodeInfoDocument> activeButTimedOutNodes = new HashMap<Integer,ClusterNodeInfoDocument>();
+        final Map<Integer,ClusterNodeInfoDocument> recoveringNodes = new HashMap<Integer,ClusterNodeInfoDocument>();
+        final Map<Integer,ClusterNodeInfoDocument> backlogNodes = new HashMap<Integer,ClusterNodeInfoDocument>();
+        final Map<Integer,ClusterNodeInfoDocument> inactiveNoBacklogNodes = new HashMap<Integer,ClusterNodeInfoDocument>();
+        
+        for (Iterator<ClusterNodeInfoDocument> it = allClusterNodes.iterator(); it.hasNext();) {
+            ClusterNodeInfoDocument clusterNode = it.next();
+            allNodeIds.put(clusterNode.getClusterId(), clusterNode);
+            if (clusterNode.isBeingRecovered()) {
+                recoveringNodes.put(clusterNode.getClusterId(), clusterNode);
+            } else if (!clusterNode.isActive()) {
+                if (hasBacklog(clusterNode)) {
+                    backlogNodes.put(clusterNode.getClusterId(), clusterNode);
+                } else {
+                    inactiveNoBacklogNodes.put(clusterNode.getClusterId(), clusterNode);
+                }
+            } else if (clusterNode.getLeaseEndTime()<System.currentTimeMillis()) {
+                activeButTimedOutNodes.put(clusterNode.getClusterId(), clusterNode);
+            } else {
+                activeNotTimedOutNodes.put(clusterNode.getClusterId(), clusterNode);
+            }
+        }
+
+        // the current view should now consist of:
+        // activeNotTimedOutNodes and activeButTimedOutNodes!
+        // (reason for including the timedout: they will yet have to 
+        // switch to recovering or inactive - but we DONT KNOW yet.. that's
+        // predicting the future - so so far we have to stick with 
+        // including them in the view)
+        final Map<Integer, ClusterNodeInfoDocument> allActives;
+        allActives = new HashMap<Integer,ClusterNodeInfoDocument>(activeNotTimedOutNodes);
+        allActives.putAll(activeButTimedOutNodes);
+        
+        // terminology:
+        // 'inactivating' are nodes that are either 'recovering' or 'backlog' ones
+        // 'recovering' are nodes for which one node is doing the recover() of lastRevs
+        // 'backlog' ones are nodes that are no longer active, that have finished the
+        // recover() but for which a backgroundRead is still pending to read
+        // the latest root changes.
+
+        logger.debug("checkView: active nodes: {}, timed out nodes: {}, recovering nodes: {}, backlog nodes: {}, inactive nodes: {}, total: {}, hence view nodes: {}",
+                activeNotTimedOutNodes.size(), activeButTimedOutNodes.size(), recoveringNodes.size(), backlogNodes.size(), inactiveNoBacklogNodes.size(), allNodeIds.size(), allActives.size());
+        
+        final ClusterViewDocument originalView = previousClusterViewDocument;
+        final ClusterViewDocument newView = doCheckView(allActives.keySet(), recoveringNodes.keySet(), backlogNodes.keySet(), inactiveNoBacklogNodes.keySet());
+        if (newView==null) {
+            logger.trace("checkView: end. newView: null");
+            return true;
+        }
+        boolean newHasInstancesWithBacklog = recoveringNodes.size()>0 || backlogNodes.size()>0;
+        boolean changed = originalView==null || (newView.getViewSeqNum()!=originalView.getViewSeqNum()) || (newHasInstancesWithBacklog!=hasInstancesWithBacklog);
+        logger.debug("checkView: viewFine: {}, changed: {}, originalView: {}, newView: {}", newView!=null, changed, originalView, newView);
+        
+        if (longTimeInactives.addAll(inactiveNoBacklogNodes.keySet())) {
+            logger.debug("checkView: updated longTimeInactives to {} (inactiveNoBacklogNodes: {})", longTimeInactives, inactiveNoBacklogNodes);
+        }
+        
+        if (changed) {
+            ClusterView v = ClusterView.fromDocument(clusterNodeId, newView, backlogNodes.keySet());
+            final ClusterView previousView = previousClusterView;
+            previousClusterView = v;
+            hasInstancesWithBacklog = newHasInstancesWithBacklog;
+            logger.info("checkView: view changed from: "+previousView+", to: "+v+", hasInstancesWithBacklog: "+hasInstancesWithBacklog);
+            return true;
+        } else {
+            logger.debug("checkView: no changes whatsoever, still at view: "+previousClusterView);
+            return hasInstancesWithBacklog;
+        }
+    }
+    
+    private Revision getLastKnownRevision(int clusterNodeId) {
+        String[] lastKnownRevisions = documentNodeStore.getMBean().getLastKnownRevisions();
+        for (int i = 0; i < lastKnownRevisions.length; i++) {
+            String aLastKnownRevisionStr = lastKnownRevisions[i];
+            String[] split = aLastKnownRevisionStr.split("=");
+            if (split.length==2) {
+                try{
+                    Integer id = Integer.parseInt(split[0]);
+                    if (id==clusterNodeId) {
+                        final Revision lastKnownRev = Revision.fromString(split[1]);
+                        logger.trace("getLastKnownRevision: end. clusterNode: {}, lastKnownRevision: {}", clusterNodeId, lastKnownRev);
+                        return lastKnownRev;
+                    }
+                } catch(NumberFormatException nfe) {
+                    logger.warn("getLastKnownRevision: could not parse integer '"+split[0]+"': "+nfe, nfe);
+                }
+            } else {
+                logger.warn("getLastKnownRevision: cannot parse lastKnownRevision: "+aLastKnownRevisionStr);
+            }
+        }
+        logger.warn("getLastKnownRevision: no lastKnownRevision found for "+clusterNodeId);
+        return null;
+    }
+
+    private boolean hasBacklog(ClusterNodeInfoDocument clusterNode) {
+        if (logger.isTraceEnabled()) {
+            logger.trace("hasBacklog: start. clusterNodeId: {}", clusterNode.getClusterId());
+        }
+        final Revision lastKnownRevision = getLastKnownRevision(clusterNode.getClusterId());
+        if (lastKnownRevision==null) {
+            logger.warn("hasBacklog: no lastKnownRevision found, hence cannot determine backlog for node "+clusterNode.getClusterId());
+            return false;
+        }
+        
+        // The lastKnownRevision is what the local instance has last read/seen from another instance.
+        // This must be compared to what the other instance *actually* has written as the very last thing.
+        // Now the knowledge what the other instance has last written (after recovery) would sit
+        // in the root document - so that could in theory be used. But reading the root document
+        // would have to be done *uncached*. And that's quite a change to what the original
+        // idea was: that the root document would only be read every second, to avoid contention.
+        // So this 'what the other instance has last written' information is retrieved via
+        // a new, dedicated property in the clusterNodes collection: the 'lastWrittenRootRev'.
+        // The 'lastWrittenRootRev' is written by 'UnsavedModifications' during backgroundUpdate
+        // and retrieved here quite regularly (but it should not be a big deal, as the
+        // discovery-lite is the only one reading this field so frequently and it does not
+        // interfere with normal (jcr) nodes at all).
+        String lastWrittenRootRevStr = clusterNode.getLastWrittenRootRev();
+        if (lastWrittenRootRevStr==null) {
+            logger.warn("hasBacklog: node has lastWrittenRootRev=null");
+            return false;
+        }
+        Revision lastWrittenRootRev = Revision.fromString(lastWrittenRootRevStr);
+        if (lastWrittenRootRev==null) {
+            logger.warn("hasBacklog: node has no lastWrittenRootRev: "+clusterNode.getClusterId());
+            return false;
+        }
+        
+        final boolean hasBacklog = Revision.getTimestampDifference(lastKnownRevision, lastWrittenRootRev)<0;
+        if (logger.isDebugEnabled()) {
+            logger.debug("hasBacklog: clusterNodeId: {}, lastKnownRevision: {}, lastWrittenRootRev: {}, hasBacklog: {}", 
+                    clusterNode.getClusterId(), lastKnownRevision, lastWrittenRootRev, hasBacklog);
+        }
+        return hasBacklog;
+    }
+
+
+    private ClusterViewDocument doCheckView(final Set<Integer> activeNodes, 
+            final Set<Integer> recoveringNodes, final Set<Integer> backlogNodes, final Set<Integer> inactiveNodes) {
+        logger.trace("doCheckView: start: activeNodes: {}, recoveringNodes: {}, backlogNodes: {}, inactiveNodes: {}", 
+                activeNodes, recoveringNodes, backlogNodes, inactiveNodes);
+        
+        Set<Integer> allInactives = new HashSet<Integer>();
+        allInactives.addAll(inactiveNodes);
+        allInactives.addAll(backlogNodes);
+        
+        if (activeNodes.size()==0) {
+            // then we have zero active nodes - that's nothing expected as that includes our own node not to be active
+            // hence handle with care - ie wait until we get an active node
+            logger.warn("doCheckView: empty active ids. activeNodes:{}, recoveringNodes:{}, inactiveNodes:{}", activeNodes, recoveringNodes, inactiveNodes);
+            return null;
+        }
+        ClusterViewDocument newViewOrNull;
+        try{
+            newViewOrNull = ClusterViewDocument.readOrUpdate(documentNodeStore, activeNodes, recoveringNodes, allInactives);
+        } catch(RuntimeException re) {
+            logger.error("doCheckView: RuntimeException: re: "+re, re);
+            return null;
+        } catch(Error er) {
+            logger.error("doCheckView: Error: er: "+er, er);
+            return null;
+        }
+        logger.trace("doChckView: readOrUpdate result: {}", newViewOrNull);
+        
+        // and now for some verbose documentation and logging:
+        if (newViewOrNull==null) {
+            // then there was a concurrent update of the clusterView
+            // and we should do some quick backoff sleeping
+            logger.debug("doCheckView: newViewOrNull is null: "+newViewOrNull);
+            return null;
+        } else {
+            // otherwise we now hold the newly valid view
+            // it could be the same or different to the previous one, let's check
+            if (previousClusterViewDocument==null) {
+                // oh ok, this is the very first one
+                previousClusterViewDocument = newViewOrNull;
+                logger.debug("doCheckView: end. first ever view: {}", newViewOrNull);
+                return newViewOrNull;
+            } else if (previousClusterViewDocument.getViewSeqNum()==newViewOrNull.getViewSeqNum()) {
+                // that's the normal case: the viewId matches, nothing has changed, we've already
+                // processed the previousClusterView, so:
+                logger.debug("doCheckView: end. seqNum did not change. view: {}", newViewOrNull);
+                return newViewOrNull;
+            } else {
+                // otherwise the view has changed
+                logger.info("doCheckView: view has changed from: {} to: {} - sending event...", previousClusterViewDocument, newViewOrNull);
+                previousClusterViewDocument = newViewOrNull;
+                logger.debug("doCheckView: end. changed view: {}", newViewOrNull);
+                return newViewOrNull;
+            }
+        }
+    }
+    
+    @Override
+    public void handleClusterStateChange() {
+        // handleClusterStateChange is needed to learn about any state change in the clusternodes 
+        // collection asap and being able to react on it - so this will wake up the
+        // backgroundWorker which in turn will - in a separate thread - check the view
+        // and send out events accordingly
+        wakeupBackgroundWorker(WakeupReason.CLUSTER_STATE_CHANGED);
+    }
+
+    private void wakeupBackgroundWorker(WakeupReason wakeupReason) {
+        final BackgroundWorker bw = backgroundWorker;
+        if (bw!=null) {
+            // get a copy of this.hasInstancesWithBacklog for just the code-part in this synchronized
+            final boolean hasInstancesWithBacklog = this.hasInstancesWithBacklog;
+            
+            if (wakeupReason==WakeupReason.BACKGROUND_READ_FINISHED) {
+                // then only forward the notify if' hasInstancesWithBacklog'
+                // ie, we have anything we could be waiting for - otherwise
+                // we dont need to wakeup the background thread
+                if (!hasInstancesWithBacklog) {
+                    logger.trace("wakeupBackgroundWorker: not waking up backgroundWorker, as we do not have any instances with backlog");
+                    return;
+                }
+            }
+            logger.trace("wakeupBackgroundWorker: waking up backgroundWorker, reason: {} (hasInstancesWithBacklog: {})", 
+                    wakeupReason, hasInstancesWithBacklog);
+            synchronized(bw) {
+                bw.notifyAll();
+            }
+        }
+    }
+
+    /**
+     * <p>
+     * Additionally the DocumentDiscoveryLiteService must be notified when the
+     * background-read has finished - as it could be waiting for a crashed node's recovery
+     * to finish - which it can only do by checking the lastKnownRevision of the crashed
+     * instance - and that check is best done after the background read is just finished
+     * (it could optinoally do that just purely time based as well, but going via a listener
+     * is more timely, that's why this approach has been chosen).
+     */
+    @Override
+    public void contentChanged(NodeState root, CommitInfo info) {
+        // contentChanged is only used to react as quickly as possible
+        // when we have instances that have a 'backlog' - ie when instances crashed
+        // and are being recovered - then we must wait until the recovery is finished
+        // AND until the subsequent background read actually reads that instance'
+        // last changes. To catch that moment as quickly as possible,
+        // this contentChanged is used.
+        // Now from the above it also results that this only wakes up the
+        // backgroundWorker if we have any pending 'backlogy instances'
+        // otherwise this is a no-op
+        if (info==null) {
+            // then ignore this as this is likely an external change
+            // note: it could be a compacted change, in which case we should
+            // probably still process it - but we have a 5sec fallback
+            // in the BackgroundWorker to handle that case too,
+            // so:
+            logger.trace("contentChanged: ignoring content change due to commit info being null");
+            return;
+        }
+        logger.trace("contentChanged: handling content changed by waking up worker if necessary");
+        wakeupBackgroundWorker(WakeupReason.BACKGROUND_READ_FINISHED);
+    }
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
index 2f9e93d..f7c7504 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
@@ -337,6 +337,14 @@ public final class DocumentNodeStore
     private final BlobStore blobStore;
 
     /**
+     * The clusterStateChangeListener is invoked on any noticed change in the clusterNodes collection.
+     * <p>
+     * Note that there is no synchronization between setting this one and using it, but arguably
+     * that is not necessary since it will be set at startup time and then never be changed.
+     */
+    private ClusterStateChangeListener clusterStateChangeListener;
+
+    /**
      * The BlobSerializer.
      */
     private final BlobSerializer blobSerializer = new BlobSerializer() {
@@ -1617,7 +1625,8 @@ public final class DocumentNodeStore
         runBackgroundReadOperations();
     }
 
-    private void runBackgroundUpdateOperations() {
+    /** Note: made package-protected for testing purpose, would otherwise be private **/
+    void runBackgroundUpdateOperations() {
         if (isDisposed.get()) {
             return;
         }
@@ -1658,7 +1667,8 @@ public final class DocumentNodeStore
 
     //----------------------< background read operations >----------------------
 
-    private void runBackgroundReadOperations() {
+    /** Note: made package-protected for testing purpose, would otherwise be private **/
+    void runBackgroundReadOperations() {
         if (isDisposed.get()) {
             return;
         }
@@ -1700,8 +1710,11 @@ public final class DocumentNodeStore
     /**
      * Updates the state about cluster nodes in {@link #activeClusterNodes}
      * and {@link #inactiveClusterNodes}.
+     * @return true if the cluster state has changed, false if the cluster state
+     * remained unchanged
      */
-    void updateClusterState() {
+    boolean updateClusterState() {
+        boolean hasChanged = false;
         long now = clock.getTime();
         Set<Integer> inactive = Sets.newHashSet();
         for (ClusterNodeInfoDocument doc : ClusterNodeInfoDocument.all(store)) {
@@ -1709,14 +1722,15 @@ public final class DocumentNodeStore
             if (cId != this.clusterId && !doc.isActive()) {
                 inactive.add(cId);
             } else {
-                activeClusterNodes.put(cId, doc.getLeaseEndTime());
+                hasChanged |= activeClusterNodes.put(cId, doc.getLeaseEndTime())==null;
             }
         }
-        activeClusterNodes.keySet().removeAll(inactive);
-        inactiveClusterNodes.keySet().retainAll(inactive);
+        hasChanged |= activeClusterNodes.keySet().removeAll(inactive);
+        hasChanged |= inactiveClusterNodes.keySet().retainAll(inactive);
         for (Integer clusterId : inactive) {
-            inactiveClusterNodes.putIfAbsent(clusterId, now);
+            hasChanged |= inactiveClusterNodes.putIfAbsent(clusterId, now)==null;
         }
+        return hasChanged;
     }
 
     /**
@@ -2406,6 +2420,15 @@ public final class DocumentNodeStore
         return blobGC;
     }
 
+    void setClusterStateChangeListener(ClusterStateChangeListener clusterStateChangeListener) {
+        this.clusterStateChangeListener = clusterStateChangeListener;
+    }
+
+    void signalClusterStateChange() {
+        if (clusterStateChangeListener!=null) {
+            clusterStateChangeListener.handleClusterStateChange();
+        }
+    }
     //-----------------------------< DocumentNodeStoreMBean >---------------------------------
 
     public DocumentNodeStoreMBean getMBean() {
@@ -2572,10 +2595,16 @@ public final class DocumentNodeStore
 
         @Override
         protected void execute(@Nonnull DocumentNodeStore nodeStore) {
-            if (nodeStore.renewClusterIdLease()) {
-                nodeStore.updateClusterState();
+            // first renew the clusterId lease
+            nodeStore.renewClusterIdLease();
+            
+            // then, independently if the lease had to be updated or not, check the status:
+            if (nodeStore.updateClusterState()) {
+                // then inform the discovery lite listener - if it is registered
+                nodeStore.signalClusterStateChange();
             }
         }
+
     }
 
     public BlobStore getBlobStore() {
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
index 84dc1ce..1475691 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
@@ -278,8 +278,16 @@ public class LastRevRecoveryAgent {
             return recover(suspects.iterator(), clusterId);
         } finally {
             Utils.closeIfCloseable(suspects);
+
             // Relinquish the lock on the recovery for the cluster on the clusterInfo
+            //TODO: in case recover throws a RuntimeException (or Error..) then
+            //      the recovery might have failed, yet the instance is marked
+            //      as 'recovered' (by setting the state to NONE).
+            //      is this really fine here? or should we not retry - or at least
+            //      log the throwable?
             missingLastRevUtil.releaseRecoveryLock(clusterId);
+
+            nodeStore.signalClusterStateChange();
         }
     }
 
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
index f491e41..a6d74cd 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
@@ -214,6 +214,17 @@ class UnsavedModifications {
                 lastRev = null;
             }
         }
+        Revision writtenRootRev = pending.get("/");
+        if (writtenRootRev!=null) {
+            int cid = writtenRootRev.getClusterId();
+            if (store.getDocumentStore().find(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, String.valueOf(cid))!=null) {
+                UpdateOp update = new UpdateOp(String.valueOf(cid), false);
+                update.equals(Document.ID, null, String.valueOf(cid));
+                update.set(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY, writtenRootRev.toString());
+                store.getDocumentStore().findAndUpdate(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, update);
+            }
+        }
+
         stats.write = clock.getTime() - time;
         return stats;
     }
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/util/AggregatingDescriptors.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/util/AggregatingDescriptors.java
new file mode 100644
index 0000000..941a45b
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/util/AggregatingDescriptors.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.util;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+import javax.jcr.Value;
+
+import org.apache.jackrabbit.oak.api.Descriptors;
+
+/**
+ * An AggregatingDescriptors is an implementation of Descriptors
+ * that allows to aggregate multiple Descriptors underneath.
+ * <p>
+ * Descriptors can be added via add(Descriptors).
+ * @see #add(Descriptors)
+ */
+public class AggregatingDescriptors implements Descriptors {
+
+    private List<Descriptors> descriptorsList = new LinkedList<Descriptors>();
+
+    /**
+     * Create an empty AggregatingDescriptors.
+     * <p>
+     * Note: add Descriptors via add(Descriptors)
+     * @see #add(Descriptors)
+     */
+    public AggregatingDescriptors() {
+        // empty
+    }
+    
+    /**
+     * Creates an AggregatingDescriptors that already contains
+     * one Descriptors.
+     * <p>
+     * Additional descriptors can be added via add(Descriptors)
+     * @see #add(Descriptors)
+     */
+    public AggregatingDescriptors(Descriptors descriptors) {
+        add(descriptors);
+    }
+    
+    /**
+     * Add a Descriptors to this AggregatingDescriptors.
+     * @param descriptors the Descriptors to add
+     */
+    public void add(Descriptors descriptors) {
+        descriptorsList.add(descriptors);
+    }
+    
+    /**
+     * Return a Descriptors from this AggregatingDescriptors.
+     * @param descriptors the Descriptors to remove
+     * @return true if the Descriptors was added in the first place,
+     * false if it was never part of this AggregatingDescriptors
+     */
+    public boolean remove(Descriptors descriptors) {
+        return descriptorsList.add(descriptors);
+    }
+    
+    @Override
+    public String[] getKeys() {
+        Set<String> keys = new HashSet<String>();
+        for (Iterator<Descriptors> it = descriptorsList.iterator(); it.hasNext();) {
+            Descriptors descriptors = it.next();
+            Collections.addAll(keys, descriptors.getKeys());
+        }
+        return keys.toArray(new String[keys.size()]);
+    }
+
+    @Override
+    public boolean isStandardDescriptor(@Nonnull String key) {
+        for (Iterator<Descriptors> it = descriptorsList.iterator(); it.hasNext();) {
+            Descriptors descriptors = it.next();
+            if (descriptors.isStandardDescriptor(key)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean isSingleValueDescriptor(@Nonnull String key) {
+        for (Iterator<Descriptors> it = descriptorsList.iterator(); it.hasNext();) {
+            Descriptors descriptors = it.next();
+            if (descriptors.isSingleValueDescriptor(key)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @CheckForNull
+    @Override
+    public Value getValue(@Nonnull String key) {
+        for (Iterator<Descriptors> it = descriptorsList.iterator(); it.hasNext();) {
+            Descriptors descriptors = it.next();
+            Value value = descriptors.getValue(key);
+            if (value!=null) {
+                return value;
+            }
+        }
+        return null;
+    }
+
+    @CheckForNull
+    @Override
+    public Value[] getValues(@Nonnull String key) {
+        for (Iterator<Descriptors> it = descriptorsList.iterator(); it.hasNext();) {
+            Descriptors descriptors = it.next();
+            Value[] values = descriptors.getValues(key);
+            if (values!=null) {
+                return values;
+            }
+        }
+        return null;
+    }
+
+}
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java
new file mode 100644
index 0000000..8deab16
--- /dev/null
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** Test helper class that is capable to simply creating ClusterView and ClusterViewDocument objs **/
+class ClusterViewBuilder {
+    
+    private final Set<Integer> activeIds = new HashSet<Integer>();
+    private final Set<Integer> recoveringIds = new HashSet<Integer>();
+    private final Set<Integer> backlogIds = new HashSet<Integer>();
+    private final Set<Integer> inactiveIds = new HashSet<Integer>();
+    private final long viewSeqNum;
+    private final String clusterViewId;
+    private final int myId;
+    
+    ClusterViewBuilder(long viewSeqNum, String clusterViewId, int myId) {
+        this.viewSeqNum = viewSeqNum;
+        this.clusterViewId = clusterViewId;
+        this.myId = myId;
+    }
+    
+    public ClusterViewBuilder active(int... instanceIds) {
+        for (int i = 0; i < instanceIds.length; i++) {
+            int anId = instanceIds[i];
+            activeIds.add(anId);
+        }
+        return this;
+    }
+    public ClusterViewBuilder recovering(int... instanceIds) {
+        for (int i = 0; i < instanceIds.length; i++) {
+            int anId = instanceIds[i];
+            recoveringIds.add(anId);
+        }
+        return this;
+    }
+    public ClusterViewBuilder backlogs(int... instanceIds) {
+        for (int i = 0; i < instanceIds.length; i++) {
+            int anId = instanceIds[i];
+            backlogIds.add(anId);
+        }
+        return this;
+    }
+    public ClusterViewBuilder inactive(int... instanceIds) {
+        for (int i = 0; i < instanceIds.length; i++) {
+            int anId = instanceIds[i];
+            inactiveIds.add(anId);
+        }
+        return this;
+    }
+    
+    public ClusterViewDocument asDoc() {
+        /*
+         *  "_id" : "clusterView",
+            "seqNum" : NumberLong(1),
+            "inactive" : null,
+            "clusterViewHistory" : {
+                
+            },
+            "deactivating" : null,
+            "created" : "2015-06-30T08:21:29.393+0200",
+            "clusterViewId" : "882f8926-1112-493a-81a0-f946087b2986",
+            "active" : "1",
+            "creator" : 1,
+            "_modCount" : NumberLong(1)
+         */
+        Document doc = new Document();
+        doc.put(ClusterViewDocument.VIEW_SEQ_NUM_KEY, viewSeqNum);
+        doc.put(ClusterViewDocument.INACTIVE_KEY, asArrayStr(inactiveIds));
+        doc.put(ClusterViewDocument.RECOVERING_KEY, asArrayStr(recoveringIds));
+        doc.put(ClusterViewDocument.ACTIVE_KEY, asArrayStr(activeIds));
+        doc.put(ClusterViewDocument.CLUSTER_VIEW_ID_KEY, clusterViewId);
+        ClusterViewDocument clusterViewDoc = new ClusterViewDocument(doc);
+        return clusterViewDoc;
+    }
+    public ClusterView asView() {
+        return ClusterView.fromDocument(myId, asDoc(), backlogIds);
+    }
+
+    private String asArrayStr(Set<Integer> ids) {
+        return ClusterViewDocument.arrayToCsv(asArray(ids));
+    }
+
+    private Integer[] asArray(Set<Integer> set) {
+        if (set.size()==0) {
+            return null;
+        } else {
+            return set.toArray(new Integer[set.size()]);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java
new file mode 100644
index 0000000..2cab7ab
--- /dev/null
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocumentTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.junit.Test;
+
+public class ClusterViewDocumentTest {
+
+    @Test
+    public void testConstructor() {
+        ClusterViewDocument doc = new ClusterViewBuilder(1, "2", 3).active(3,4).asDoc();
+        assertNotNull(doc);
+        assertEquals("2", doc.getClusterViewId());
+        assertEquals(0, doc.getRecoveringIds().size());
+        assertEquals(0, doc.getInactiveIds().size());
+        assertEquals(2, doc.getActiveIds().size());
+        assertTrue(doc.getActiveIds().contains(3));
+        assertTrue(doc.getActiveIds().contains(4));
+
+        doc = new ClusterViewBuilder(1, "2", 3).active(3,4).backlogs(5).inactive(5,6).asDoc();
+        assertNotNull(doc);
+        assertEquals("2", doc.getClusterViewId());
+        assertEquals(0, doc.getRecoveringIds().size());
+        assertEquals(2, doc.getInactiveIds().size());
+        assertEquals(2, doc.getActiveIds().size());
+        assertTrue(doc.getActiveIds().contains(3));
+        assertTrue(doc.getActiveIds().contains(4));
+        assertTrue(doc.getInactiveIds().contains(5));
+        assertTrue(doc.getInactiveIds().contains(6));
+
+        doc = new ClusterViewBuilder(11, "x", 4).active(3,4,5).recovering(6).inactive(7,8).asDoc();
+        assertNotNull(doc);
+        assertEquals(11, doc.getViewSeqNum());
+        assertEquals("x", doc.getClusterViewId());
+        assertEquals(1, doc.getRecoveringIds().size());
+        assertEquals(2, doc.getInactiveIds().size());
+        assertEquals(3, doc.getActiveIds().size());
+        assertTrue(doc.getActiveIds().contains(3));
+        assertTrue(doc.getActiveIds().contains(4));
+        assertTrue(doc.getActiveIds().contains(5));
+        assertTrue(doc.getRecoveringIds().contains(6));
+        assertTrue(doc.getInactiveIds().contains(7));
+        assertTrue(doc.getInactiveIds().contains(8));
+    
+    }
+    
+    @Test
+    public void testReadUpdate() throws Exception {
+        final int localClusterId = 11;
+        final DocumentNodeStore ns = createMK(localClusterId).nodeStore;
+
+        try{
+            ClusterViewDocument.readOrUpdate(ns, null, null, null);
+            fail("should complain");
+        } catch(Exception ok) {
+            // ok
+        }
+
+        try{
+            ClusterViewDocument.readOrUpdate(ns, new HashSet<Integer>(), null, null);
+            fail("should complain");
+        } catch(Exception ok) {
+            // ok
+        }
+
+        Set<Integer> activeIds = new HashSet<Integer>();
+        activeIds.add(2);
+        Set<Integer> recoveringIds = null;
+        Set<Integer> inactiveIds = null;
+        // first ever view:
+        ClusterViewDocument doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+        final String id = doc.getClusterViewId();
+        assertTrue(id!=null && id.length()>0);
+        String createdAt = doc.getCreatedAt();
+        assertTrue(createdAt!=null && createdAt.length()>0);
+        int createdBy = doc.getCreatedBy();
+        assertEquals(localClusterId, createdBy);
+        assertEquals(1, doc.getViewSeqNum());
+        assertEquals(1, doc.getActiveIds().size());
+        assertTrue(doc.getActiveIds().contains(2));
+        assertEquals(0, doc.getRecoveringIds().size());
+        assertEquals(0, doc.getInactiveIds().size());
+        
+        // now let's check if it doesn't change anything when we're not doing any update
+        doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+        assertEquals(1, doc.getViewSeqNum());
+        
+        // and now add a new active id
+        activeIds.add(3);
+        doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+        assertEquals(id, doc.getClusterViewId());
+        createdAt = doc.getCreatedAt();
+        assertTrue(createdAt!=null && createdAt.length()>0);
+        createdBy = doc.getCreatedBy();
+        assertEquals(localClusterId, createdBy);
+        assertEquals(2, doc.getViewSeqNum());
+        assertEquals(2, doc.getActiveIds().size());
+        assertTrue(doc.getActiveIds().contains(2));
+        assertTrue(doc.getActiveIds().contains(3));
+        assertEquals(0, doc.getRecoveringIds().size());
+        assertEquals(0, doc.getInactiveIds().size());        
+
+        // now let's check if it doesn't change anything when we're not doing any update
+        doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+        assertEquals(2, doc.getViewSeqNum());
+
+        // and now add a new recovering id
+        recoveringIds = new HashSet<Integer>();
+        recoveringIds.add(4);
+        doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+        assertEquals(id, doc.getClusterViewId());
+        createdAt = doc.getCreatedAt();
+        assertTrue(createdAt!=null && createdAt.length()>0);
+        createdBy = doc.getCreatedBy();
+        assertEquals(localClusterId, createdBy);
+        assertEquals(3, doc.getViewSeqNum());
+        assertEquals(2, doc.getActiveIds().size());
+        assertTrue(doc.getActiveIds().contains(2));
+        assertTrue(doc.getActiveIds().contains(3));
+        assertEquals(1, doc.getRecoveringIds().size());
+        assertTrue(doc.getRecoveringIds().contains(4));
+        assertEquals(0, doc.getInactiveIds().size());
+    
+        // now let's check if it doesn't change anything when we're not doing any update
+        doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+        assertEquals(3, doc.getViewSeqNum());
+
+        // and now move that one to inactive
+        recoveringIds = new HashSet<Integer>();
+        inactiveIds = new HashSet<Integer>();
+        inactiveIds.add(4);
+        doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+        assertEquals(id, doc.getClusterViewId());
+        createdAt = doc.getCreatedAt();
+        assertTrue(createdAt!=null && createdAt.length()>0);
+        createdBy = doc.getCreatedBy();
+        assertEquals(localClusterId, createdBy);
+        assertEquals(4, doc.getViewSeqNum());
+        assertEquals(2, doc.getActiveIds().size());
+        assertTrue(doc.getActiveIds().contains(2));
+        assertTrue(doc.getActiveIds().contains(3));
+        assertEquals(0, doc.getRecoveringIds().size());
+        assertEquals(1, doc.getInactiveIds().size());
+        assertTrue(doc.getInactiveIds().contains(4));
+
+        // now let's check if it doesn't change anything when we're not doing any update
+        doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+        assertEquals(4, doc.getViewSeqNum());
+        doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+        assertEquals(4, doc.getViewSeqNum());
+        doc = ClusterViewDocument.readOrUpdate(ns, activeIds, recoveringIds, inactiveIds);
+        assertEquals(4, doc.getViewSeqNum());
+    }
+    
+    private static DocumentMK createMK(int clusterId){
+        return create(new MemoryDocumentStore(), clusterId);
+    }
+
+    private static DocumentMK create(DocumentStore ds, int clusterId){
+        return new DocumentMK.Builder()
+                .setAsyncDelay(0)
+                .setDocumentStore(ds)
+                .setClusterId(clusterId)
+                .setPersistentCache("target/persistentCache,time")
+                .open();
+    }
+    
+}
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java
new file mode 100644
index 0000000..8b00822
--- /dev/null
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.jackrabbit.oak.commons.json.JsonObject;
+import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
+import org.junit.Test;
+
+/** Simple paranoia tests for constructor and getters of ClusterViewImpl **/
+public class ClusterViewTest {
+    
+    @Test
+    public void testConstructor() throws Exception {
+        final Integer viewId = 3;
+        final String clusterViewId = UUID.randomUUID().toString();
+        final Integer instanceId = 2;
+        final Set<Integer> emptyInstanceIds = new HashSet<Integer>();
+        final Set<Integer> instanceIds = new HashSet<Integer>();
+        instanceIds.add(1);
+        final Set<Integer> deactivating = new HashSet<Integer>();
+        final Set<Integer> inactive = new HashSet<Integer>();
+        try{
+            new ClusterView(-1, true, clusterViewId, instanceId, instanceIds, deactivating, inactive);
+            fail("should complain");
+        } catch(IllegalStateException e) {
+            // ok
+        }
+        try{
+            new ClusterView(viewId, true, null, instanceId, instanceIds, deactivating, inactive);
+            fail("should complain");
+        } catch(IllegalStateException e) {
+            // ok
+        }
+        try{
+            new ClusterView(viewId, true, clusterViewId, -1, instanceIds, deactivating, inactive);
+            fail("should complain");
+        } catch(IllegalStateException e) {
+            // ok
+        }
+        try{
+            new ClusterView(viewId, true, clusterViewId, instanceId, emptyInstanceIds, deactivating, inactive);
+            fail("should complain");
+        } catch(IllegalStateException e) {
+            // ok
+        }
+        try{
+            new ClusterView(viewId, true, clusterViewId, instanceId, null, deactivating, inactive);
+            fail("should complain");
+        } catch(IllegalStateException e) {
+            // ok
+        }
+        try{
+            new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, null, inactive);
+            fail("should complain");
+        } catch(Exception e) {
+            // ok
+        }
+        try{
+            new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, deactivating, null);
+            fail("should complain");
+        } catch(Exception e) {
+            // ok
+        }
+        final Set<Integer> nonEmptyDeactivating = new HashSet<Integer>();
+        nonEmptyDeactivating.add(3);
+        new ClusterView(viewId, false, clusterViewId, instanceId, instanceIds, nonEmptyDeactivating, inactive);
+        new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, nonEmptyDeactivating, inactive);
+        // should not complain about:
+        new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, deactivating, inactive);
+    }
+    
+    @Test
+    public void testGetters() throws Exception {
+        final Integer viewId = 3;
+        final String clusterViewId = UUID.randomUUID().toString();
+        final Set<Integer> instanceIds = new HashSet<Integer>();
+        instanceIds.add(1);
+        final Integer instanceId = 2;
+        final Set<Integer> deactivating = new HashSet<Integer>();
+        final Set<Integer> inactive = new HashSet<Integer>();
+        final ClusterView cv = new ClusterView(viewId, true, clusterViewId, instanceId, instanceIds, deactivating, inactive);
+        assertNotNull(cv);
+        assertTrue(cv.asDescriptorValue().length()>0);
+        assertTrue(cv.toString().length()>0);
+    }
+    
+    @Test
+    public void testOneActiveOnly() throws Exception {
+        String clusterViewId = UUID.randomUUID().toString();
+        ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 21);
+        ClusterView view = builder.active(21).asView();
+        
+        // {"seq":10,"id":"35f60ed3-508d-4a81-b812-89f07f57db20","me":2,"active":[2],"deactivating":[],"inactive":[3]}
+        JsonObject o = asJsonObject(view);
+        Map<String, String> props = o.getProperties();
+        assertEquals("10", props.get("seq"));
+        assertEquals(clusterViewId, unwrapString(props.get("id")));
+        assertEquals("21", props.get("me"));
+        assertEquals(asJsonArray(21), props.get("active"));
+        assertEquals(asJsonArray(), props.get("deactivating"));
+        assertEquals(asJsonArray(), props.get("inactive"));
+    }
+    
+    @Test
+    public void testOneActiveOneInactive() throws Exception {
+        String clusterViewId = UUID.randomUUID().toString();
+        ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2);
+        ClusterView view = builder.active(2).inactive(3).asView();
+        
+        // {"seq":10,"id":"35f60ed3-508d-4a81-b812-89f07f57db20","me":2,"active":[2],"deactivating":[],"inactive":[3]}
+        JsonObject o = asJsonObject(view);
+        Map<String, String> props = o.getProperties();
+        assertEquals("10", props.get("seq"));
+        assertEquals(clusterViewId, unwrapString(props.get("id")));
+        assertEquals("2", props.get("me"));
+        assertEquals(asJsonArray(2), props.get("active"));
+        assertEquals(asJsonArray(), props.get("deactivating"));
+        assertEquals(asJsonArray(3), props.get("inactive"));
+    }
+    
+    @Test
+    public void testSeveralActiveOneInactive() throws Exception {
+        String clusterViewId = UUID.randomUUID().toString();
+        ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2);
+        ClusterView view = builder.active(2,5,6).inactive(3).asView();
+        
+        // {"seq":10,"id":"35f60ed3-508d-4a81-b812-89f07f57db20","me":2,"active":[2],"deactivating":[],"inactive":[3]}
+        JsonObject o = asJsonObject(view);
+        Map<String, String> props = o.getProperties();
+        assertEquals("10", props.get("seq"));
+        assertEquals("true", props.get("final"));
+        assertEquals(clusterViewId, unwrapString(props.get("id")));
+        assertEquals("2", props.get("me"));
+        assertEquals(asJsonArray(2,5,6), props.get("active"));
+        assertEquals(asJsonArray(), props.get("deactivating"));
+        assertEquals(asJsonArray(3), props.get("inactive"));
+    }
+    
+    @Test
+    public void testOneActiveSeveralInactive() throws Exception {
+        String clusterViewId = UUID.randomUUID().toString();
+        ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2);
+        ClusterView view = builder.active(2).inactive(3,4,5,6).asView();
+        
+        // {"seq":10,"id":"35f60ed3-508d-4a81-b812-89f07f57db20","me":2,"active":[2],"deactivating":[],"inactive":[3]}
+        JsonObject o = asJsonObject(view);
+        Map<String, String> props = o.getProperties();
+        assertEquals("10", props.get("seq"));
+        assertEquals("true", props.get("final"));
+        assertEquals(clusterViewId, unwrapString(props.get("id")));
+        assertEquals("2", props.get("me"));
+        assertEquals(asJsonArray(2), props.get("active"));
+        assertEquals(asJsonArray(), props.get("deactivating"));
+        assertEquals(asJsonArray(3,4,5,6), props.get("inactive"));
+    }
+    
+    @Test
+    public void testWithRecoveringOnly() throws Exception {
+        String clusterViewId = UUID.randomUUID().toString();
+        ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2);
+        ClusterView view = builder.active(2,3).recovering(4).inactive(5,6).asView();
+        
+        JsonObject o = asJsonObject(view);
+        Map<String, String> props = o.getProperties();
+        assertEquals("10", props.get("seq"));
+        assertEquals("true", props.get("final"));
+        assertEquals(clusterViewId, unwrapString(props.get("id")));
+        assertEquals("2", props.get("me"));
+        assertEquals(asJsonArray(2,3), props.get("active"));
+        assertEquals(asJsonArray(4), props.get("deactivating"));
+        assertEquals(asJsonArray(5,6), props.get("inactive"));
+    }
+
+    @Test
+    public void testWithRecoveringAndBacklog() throws Exception {
+        String clusterViewId = UUID.randomUUID().toString();
+        ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2);
+        ClusterView view = builder.active(2,3).recovering(4).inactive(5,6).backlogs(5).asView();
+        
+        JsonObject o = asJsonObject(view);
+        Map<String, String> props = o.getProperties();
+        assertEquals("10", props.get("seq"));
+        assertEquals(clusterViewId, unwrapString(props.get("id")));
+        assertEquals("2", props.get("me"));
+        assertEquals("false", props.get("final"));
+        assertEquals(asJsonArray(2,3), props.get("active"));
+        assertEquals(asJsonArray(4,5), props.get("deactivating"));
+        assertEquals(asJsonArray(6), props.get("inactive"));
+    }
+
+    @Test
+    public void testBacklogButNotInactive() throws Exception {
+        String clusterViewId = UUID.randomUUID().toString();
+        ClusterViewBuilder builder = new ClusterViewBuilder(10, clusterViewId, 2);
+        try{
+            ClusterView view = builder.active(2,3).backlogs(5).asView();
+            fail("should complain");
+        } catch(Exception ok) {
+            // ok
+        }
+    }
+
+    private JsonObject asJsonObject(final ClusterView view) {
+        final String json = view.asDescriptorValue();
+        System.out.println(json);
+        JsopTokenizer t = new JsopTokenizer(json);
+        t.read('{');
+        JsonObject o = JsonObject.create(t);
+        return o;
+    }
+
+    private String unwrapString(String stringWithQuotes) {
+        //TODO: I'm not really sure why the JsonObject parses this string including the "
+        // perhaps that's rather a bug ..
+        assertTrue(stringWithQuotes.startsWith("\""));
+        assertTrue(stringWithQuotes.endsWith("\""));
+        return stringWithQuotes.substring(1, stringWithQuotes.length()-1);
+    }
+
+    static String asJsonArray(int... ids) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        for (int i = 0; i < ids.length; i++) {
+            int anId = ids[i];
+            if (i!=0) {
+                sb.append(",");
+            }
+            sb.append(String.valueOf(anId));
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+}
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java
new file mode 100644
index 0000000..70bcc4a
--- /dev/null
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentDiscoveryLiteServiceTest.java
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jcr.PropertyType;
+import javax.jcr.Value;
+import javax.jcr.ValueFormatException;
+
+import junitx.util.PrivateAccessor;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Descriptors;
+import org.apache.jackrabbit.oak.commons.json.JsonObject;
+import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
+import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.mongodb.DB;
+
+/**
+ * Tests for the DocumentDiscoveryLiteService
+ */
+public class DocumentDiscoveryLiteServiceTest {
+    
+    /** container for what should represent an instance, but is not a complete one, hence 'simplified'.
+     * it contains most importantly the DocuemntNodeStore and the discoveryLite service
+     */
+    class SimplifiedInstance {
+        
+        private DocumentDiscoveryLiteService service;
+        private DocumentNodeStore ns;
+        private final Descriptors descriptors;
+        private Map<String,Object> registeredServices;
+        private final long lastRevInterval;
+        private volatile boolean lastRevStopped = false;
+        private volatile boolean writeSimulationStopped = false;
+        private Thread lastRevThread;
+        private Thread writeSimulationThread;
+        public String workingDir;
+        
+        SimplifiedInstance(DocumentDiscoveryLiteService service, 
+                DocumentNodeStore ns, Descriptors descriptors, 
+                Map<String,Object> registeredServices, long lastRevInterval,
+                String workingDir) {
+            this.service = service;
+            this.ns = ns;
+            this.workingDir = workingDir;
+            this.descriptors = descriptors;
+            this.registeredServices = registeredServices;
+            this.lastRevInterval = lastRevInterval;
+            if (lastRevInterval>0) {
+                startLastRevThread();
+            }
+        }
+        
+        @Override
+        public String toString() {
+            return "SimplifiedInstance[cid="+ns.getClusterId()+"]";
+        }
+
+        private void startLastRevThread() {
+            lastRevStopped = false;
+            lastRevThread = new Thread(new Runnable() {
+   
+                @Override
+                public void run() {
+                    while(!lastRevStopped) {
+                        SimplifiedInstance.this.ns.getLastRevRecoveryAgent().performRecoveryIfNeeded();                   
+                        try {
+                            Thread.sleep(SimplifiedInstance.this.lastRevInterval);
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+                
+            });
+            lastRevThread.setDaemon(true);
+            lastRevThread.setName("lastRevThread[cid="+ns.getClusterId()+"]");
+            lastRevThread.start();
+        }
+        
+        void stopLastRevThread() throws InterruptedException {
+            lastRevStopped = true;
+            lastRevThread.join();
+        }
+        
+        boolean isFinal() throws Exception {
+            final JsonObject clusterViewObj = getClusterViewObj();
+            if (clusterViewObj==null) {
+                throw new IllegalStateException("should always have that final flag set");
+            }
+            
+            String finalStr = clusterViewObj.getProperties().get("final");
+
+            return Boolean.valueOf(finalStr);
+        }
+
+        boolean hasActiveIds(String clusterViewStr, int... expected) throws Exception {
+            return hasIds(clusterViewStr, "active", expected);
+        }
+
+        boolean hasDeactivatingIds(String clusterViewStr, int... expected) throws Exception {
+            return hasIds(clusterViewStr, "deactivating", expected);
+        }
+
+        boolean hasInactiveIds(String clusterViewStr, int... expected) throws Exception {
+            return hasIds(clusterViewStr, "inactive", expected);
+        }
+
+        private boolean hasIds(final String clusterViewStr, final String key, int... expectedIds)
+                throws Exception {
+            final JsonObject clusterViewObj = asJsonObject(clusterViewStr);
+            String actualIdsStr = clusterViewObj==null ? null : clusterViewObj.getProperties().get(key);
+            
+            boolean actualEmpty = actualIdsStr==null || actualIdsStr.length()==0 || actualIdsStr.equals("[]");
+            boolean expectedEmpty = expectedIds==null || expectedIds.length==0;
+            
+            if (actualEmpty && expectedEmpty) {
+                return true;
+            }
+            if (actualEmpty != expectedEmpty) {
+                return false;
+            }
+            
+            final List<Integer> actualList = Arrays.asList(ClusterViewDocument.csvToIntegerArray(actualIdsStr.substring(1, actualIdsStr.length()-1)));
+            if (expectedIds.length!=actualList.size()) {
+                return false;
+            }
+            for (int i = 0; i < expectedIds.length; i++) {
+                int anExpectedId = expectedIds[i];
+                if (!actualList.contains(anExpectedId)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        
+        JsonObject getClusterViewObj() throws Exception {
+            final String json = getClusterViewStr();
+            return asJsonObject(json);
+        }
+
+        private JsonObject asJsonObject(final String json) {
+            if (json==null) {
+                return null;
+            }
+            JsopTokenizer t = new JsopTokenizer(json);
+            t.read('{');
+            JsonObject o = JsonObject.create(t);
+            return o;
+        }
+
+        String getClusterViewStr() throws Exception {
+            return getDescriptor(DocumentDiscoveryLiteService.OAK_DISCOVERYLITE_CLUSTERVIEW);
+        }
+        
+        String getDescriptor(String key) throws Exception {
+            final Value value = descriptors.getValue(key);
+            if (value==null) {
+                return null;
+            }
+            if (value.getType()!=PropertyType.STRING) {
+                return null;
+            }
+            try{
+                return value.getString();
+            } catch(ValueFormatException vfe) {
+                return null;
+            }
+        }
+        
+        public void dispose() {
+            logger.info("Disposing "+this);
+            try {
+                stopSimulatingWrites();
+            } catch (InterruptedException e) {
+                fail("interrupted");
+            }
+            if (lastRevThread!=null) {
+                try{
+                    stopLastRevThread();
+                } catch(InterruptedException ok) {
+                    fail("interrupted");
+                }
+                lastRevThread = null;
+            }
+            if (service!=null) {
+                service.deactivate();
+                service = null;
+            }
+            if (ns!=null) {
+                ns.dispose();
+                ns = null;
+            }
+            if (registeredServices!=null) {
+                registeredServices.clear();
+                registeredServices = null;
+            }
+        }
+
+        /** shutdown simulates the normal, graceful, shutdown 
+         * @throws InterruptedException */
+        public void shutdown() throws InterruptedException {
+            stopSimulatingWrites();
+            stopLastRevThread();
+            ns.dispose();
+            service.deactivate();
+        }
+
+        /** crash simulates a kill -9, sort of 
+         * @throws Throwable */
+        public void crash() throws Throwable {
+            logger.info("crash: stopping simulating writes...");
+            stopSimulatingWrites();
+            logger.info("crash: stopping lastrev thread...");
+            stopLastRevThread();
+            logger.info("crash: stopped lastrev thread, now setting least to end within 1 sec");
+
+            boolean renewed = setLeaseTime(1000 /* 1 sec */);
+            if (!renewed) {
+                logger.info("halt");
+                fail("did not renew clusterid lease");
+            }
+            
+            logger.info("crash: now stopping background read/update");
+            stopAllBackgroundThreads();
+            // but don't do the following from DocumentNodeStore.dispose():
+            // * don't do the last internalRunBackgroundUpdateOperations - as we're trying to simulate a crash here
+            // * don't dispose clusterNodeInfo to leave the node in active state
+            
+            // the DocumentDiscoveryLiteService currently can simply be deactivated, doesn't differ much from crashing
+            service.deactivate();
+            logger.info("crash: crash simulation done.");
+        }
+
+        /**
+         * very hacky way of doing the following:
+         * make sure this instance's clusterNodes entry is marked with a very short (1 sec off) lease end time
+         * so that the crash detection doesn't take a minute (as it would by default)
+         */
+        private boolean setLeaseTime(final int leaseTime)
+                throws NoSuchFieldException {
+            ns.getClusterInfo().setLeaseTime(leaseTime);
+            PrivateAccessor.setField(ns.getClusterInfo(), "leaseEndTime", System.currentTimeMillis()+(leaseTime/2));
+            boolean renewed = ns.renewClusterIdLease();
+            return renewed;
+        }
+        
+        private AtomicBoolean getIsDisposed() throws NoSuchFieldException {
+            AtomicBoolean isDisposed = (AtomicBoolean) PrivateAccessor.getField(ns, "isDisposed");
+            return isDisposed;
+        }
+
+        private void stopAllBackgroundThreads() throws NoSuchFieldException {
+            // get all those background threads...
+            Thread backgroundReadThread = (Thread) PrivateAccessor.getField(ns, "backgroundReadThread");
+            assertNotNull(backgroundReadThread);
+            Thread backgroundUpdateThread = (Thread) PrivateAccessor.getField(ns, "backgroundUpdateThread");
+            assertNotNull(backgroundUpdateThread);
+            Thread leaseUpdateThread = (Thread) PrivateAccessor.getField(ns, "leaseUpdateThread");
+            assertNotNull(leaseUpdateThread);
+            
+            // start doing what DocumentNodeStore.dispose() would do - except do it very fine controlled, basically:
+            // make sure to stop backgroundReadThread, backgroundUpdateThread and leaseUpdateThread
+            // but then nothing else.
+            final AtomicBoolean isDisposed = getIsDisposed();
+            assertFalse(isDisposed.getAndSet(true));
+            // notify background threads waiting on isDisposed
+            synchronized (isDisposed) {
+                isDisposed.notifyAll();
+            }
+            try {
+                backgroundReadThread.join(5000);
+                assertTrue(!backgroundReadThread.isAlive());
+            } catch (InterruptedException e) {
+                // ignore
+            }
+            try {
+                backgroundUpdateThread.join(5000);
+                assertTrue(!backgroundUpdateThread.isAlive());
+            } catch (InterruptedException e) {
+                // ignore
+            }
+            try {
+                leaseUpdateThread.join(5000);
+                assertTrue(!leaseUpdateThread.isAlive());
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+
+        public void stopBgReadThread() throws NoSuchFieldException {
+            final Thread backgroundReadThread = (Thread) PrivateAccessor.getField(ns, "backgroundReadThread");
+            assertNotNull(backgroundReadThread);
+            final Runnable bgReadRunnable = (Runnable) PrivateAccessor.getField(backgroundReadThread, "target");
+            assertNotNull(bgReadRunnable);
+            final AtomicBoolean bgReadIsDisposed = new AtomicBoolean(false);
+            PrivateAccessor.setField(bgReadRunnable, "isDisposed", bgReadIsDisposed);
+            assertFalse(bgReadIsDisposed.getAndSet(true));
+            try {
+                backgroundReadThread.join(5000);
+                assertTrue(!backgroundReadThread.isAlive());
+            } catch (InterruptedException e) {
+                // ignore
+            }
+            // big of heavy work, but now the backgroundReadThread is stopped and all the others are still running
+        }
+
+        public void addNode(String path) throws CommitFailedException {
+            NodeBuilder root = ns.getRoot().builder();
+            NodeBuilder child = root;
+            String[] split = path.split("/");
+            for(int i=1; i<split.length; i++) {
+                child = child.child(split[i]);
+            }
+            logger.info("addNode: "+ns.getClusterId()+" is merging path "+path);
+            ns.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        }
+
+        public void setProperty(String path, String key, String value) throws CommitFailedException {
+            NodeBuilder root = ns.getRoot().builder();
+            NodeBuilder child = root;
+            String[] split = path.split("/");
+            for(int i=1; i<split.length; i++) {
+                child = child.child(split[i]);
+            }
+            child.setProperty(key, value);
+            logger.info("setProperty: "+ns.getClusterId()+" is merging path/property "+path+", key="+key+", value="+value);
+            ns.merge(root, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+        }
+
+        public void setLeastTimeout(long timeoutInMs) throws NoSuchFieldException {
+            ns.getClusterInfo().setLeaseTime(timeoutInMs);
+            PrivateAccessor.setField(ns.getClusterInfo(), "leaseEndTime", System.currentTimeMillis()-1000);
+        }
+
+        private void startSimulatingWrites(final long writeInterval) {
+            writeSimulationStopped = false;
+            writeSimulationThread = new Thread(new Runnable() {
+                
+                final Random random = new Random();
+   
+                @Override
+                public void run() {
+                    while(!writeSimulationStopped) {
+                        try {
+                            writeSomething();
+                            Thread.sleep(SimplifiedInstance.this.lastRevInterval);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+
+                private void writeSomething() throws CommitFailedException {
+                    final String path = "/"+ns.getClusterId()+"/"+random.nextInt(100)+"/"+random.nextInt(100)+"/"+random.nextInt(100);
+                    logger.info("Writing ["+ns.getClusterId()+"]"+path);
+                    addNode(path);
+                }
+                
+            });
+            writeSimulationThread.setDaemon(true);
+            writeSimulationThread.start();
+        }
+        
+        void stopSimulatingWrites() throws InterruptedException {
+            writeSimulationStopped = true;
+            if (writeSimulationThread!=null) {
+                writeSimulationThread.join();
+            }
+        }
+        
+    }
+    
+    interface Expectation {
+        /** check if the expectation is fulfilled, return true if it is, return a descriptive error msg if not **/
+        String fulfilled() throws Exception;
+    }
+    
+    class ViewExpectation implements Expectation {
+
+        private int[] activeIds;
+        private int[] deactivatingIds;
+        private int[] inactiveIds;
+        private final SimplifiedInstance discoveryLiteCombo;
+        private boolean isFinal = true;
+
+        ViewExpectation(SimplifiedInstance discoveryLiteCombo) {
+            this.discoveryLiteCombo = discoveryLiteCombo;
+        }
+
+        private int[] asIntArray(Integer[] arr) {
+            int[] result = new int[arr.length];
+            for(int i=0; i<arr.length; i++) {
+                result[i] = arr[i];
+            }
+            return result;
+        }
+        
+        void setActiveIds(Integer[] activeIds) {
+            this.activeIds = asIntArray(activeIds);
+        }
+        
+        void setActiveIds(int... activeIds) {
+            this.activeIds = activeIds;
+        }
+        
+        void setDeactivatingIds(int... deactivatingIds) {
+            this.deactivatingIds = deactivatingIds;
+        }
+        
+        void setInactiveIds(Integer[] inactiveIds) {
+            this.inactiveIds = asIntArray(inactiveIds);
+        }
+        
+        void setInactiveIds(int... inaactiveIds) {
+            this.inactiveIds = inaactiveIds;
+        }
+        
+        @Override
+        public String fulfilled() throws Exception {
+            final String clusterViewStr = discoveryLiteCombo.getClusterViewStr();
+            if (clusterViewStr==null) {
+                if (activeIds.length!=0) {
+                    return "no clusterView, but expected activeIds: "+beautify(activeIds);
+                }
+                if (deactivatingIds.length!=0) {
+                    return "no clusterView, but expected deactivatingIds: "+beautify(deactivatingIds);
+                }
+                if (inactiveIds.length!=0) {
+                    return "no clusterView, but expected inactiveIds: "+beautify(inactiveIds);
+                }
+            }
+            if (!discoveryLiteCombo.hasActiveIds(clusterViewStr, activeIds)) {
+                return "activeIds dont match, expected: "+beautify(activeIds)+", got clusterView: "+clusterViewStr;
+            }
+            if (!discoveryLiteCombo.hasDeactivatingIds(clusterViewStr, deactivatingIds)) {
+                return "deactivatingIds dont match, expected: "+beautify(deactivatingIds)+", got clusterView: "+clusterViewStr;
+            }
+            if (!discoveryLiteCombo.hasInactiveIds(clusterViewStr, inactiveIds)) {
+                return "inactiveIds dont match, expected: "+beautify(inactiveIds)+", got clusterView: "+clusterViewStr;
+            }
+            if (discoveryLiteCombo.isFinal()!=isFinal) {
+                return "final flag does not match. expected: "+isFinal+", but is: "+discoveryLiteCombo.isFinal();
+            }
+            return null;
+        }
+
+        private String beautify(int[] ids) {
+            if (ids==null) {
+                return "";
+            }
+            StringBuffer sb = new StringBuffer();
+            for(int i=0; i<ids.length; i++) {
+                if (i!=0) {
+                    sb.append(",");
+                }
+                sb.append(ids[i]);
+            }
+            return sb.toString();
+        }
+
+        public void setFinal(boolean isFinal) {
+            this.isFinal = isFinal; 
+        }
+        
+    }
+
+    private static final boolean MONGO_DB = true;
+    // private static final boolean MONGO_DB = true;
+
+    private List<DocumentMK> mks = Lists.newArrayList();
+    private MemoryDocumentStore ds;
+    private MemoryBlobStore bs;
+
+    final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private List<SimplifiedInstance> allInstances = new LinkedList<SimplifiedInstance>();
+    
+    @Test
+    public void testActivateDeactivate() throws Exception {
+        // test the variant where we're not running with DocumentNodeStore
+        DocumentDiscoveryLiteService discoveryLite = new DocumentDiscoveryLiteService();
+        ComponentContext c = mock(ComponentContext.class);
+        discoveryLite.activate(c);
+        verify(c, times(1)).disableComponent(DocumentDiscoveryLiteService.COMPONENT_NAME);
+
+        // then test normal start with a DocumentNodeStore
+        DocumentMK mk1 = createMK(1, 0);
+        discoveryLite = new DocumentDiscoveryLiteService();
+        PrivateAccessor.setField(discoveryLite, "nodeStore", mk1.nodeStore);
+        BundleContext bc = mock(BundleContext.class);
+        c = mock(ComponentContext.class);
+        when(c.getBundleContext()).thenReturn(bc);
+        discoveryLite.activate(c);
+        verify(c, times(0)).disableComponent(DocumentDiscoveryLiteService.COMPONENT_NAME);
+        discoveryLite.deactivate();
+    }
+    
+    /** Borrowed from http://stackoverflow.com/questions/3301635/change-private-static-final-field-using-java-reflection */
+    static void setFinalStatic(Field field, Object newValue) throws Exception {
+        field.setAccessible(true);
+
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+
+        field.set(null, newValue);
+    }
+    
+    // subsequent tests should get a DocumentDiscoveryLiteService setup from the start
+    private DocumentNodeStore createNodeStore(String workingDir) throws SecurityException, Exception {
+        // ensure that we always get a fresh cluster[node]id
+        System.setProperty("user.dir", workingDir);
+        setFinalStatic(ClusterNodeInfo.class.getDeclaredField("WORKING_DIR"), workingDir);
+        
+        // then create the DocumentNodeStore
+        DocumentMK mk1 = createMK(0 /* to make sure the clusterNodes collection is used **/, 500 /* asyncDelay: background interval*/);
+        
+        logger.info("createNodeStore: created DocumentNodeStore with cid="+mk1.nodeStore.getClusterId()+", workingDir="+workingDir);
+        return mk1.nodeStore;
+    }
+    
+    private SimplifiedInstance createInstance() throws Exception {
+        final String workingDir = UUID.randomUUID().toString();
+        return createInstance(workingDir);
+    }
+    
+    private SimplifiedInstance createInstance(String workingDir) throws SecurityException, Exception {
+        DocumentNodeStore ns = createNodeStore(workingDir);
+        return createInstance(ns, workingDir);
+    }
+    private SimplifiedInstance createInstance(DocumentNodeStore ns, String workingDir) throws NoSuchFieldException {
+        DocumentDiscoveryLiteService discoveryLite = new DocumentDiscoveryLiteService();
+        PrivateAccessor.setField(discoveryLite, "nodeStore", ns);
+        BundleContext bc = mock(BundleContext.class);
+        ComponentContext c = mock(ComponentContext.class);
+        when(c.getBundleContext()).thenReturn(bc);
+        final Map<String,Object> registeredServices = new HashMap<String, Object>();
+        when(bc.registerService(anyString(), anyObject(), (Properties)anyObject())).then(new Answer<ServiceRegistration>() {
+            @Override
+            public ServiceRegistration answer(InvocationOnMock invocation) {
+                registeredServices.put((String) invocation.getArguments()[0], invocation.getArguments()[1]);
+                return null;
+            }
+        });
+        discoveryLite.activate(c);
+        Descriptors d = (Descriptors) registeredServices.get(Descriptors.class.getName());
+        final SimplifiedInstance result = new SimplifiedInstance(discoveryLite, ns, d, registeredServices, 500, workingDir);
+        allInstances.add(result);
+        logger.info("Created "+result);
+        return result;
+    }
+    
+    private void waitFor(Expectation expectation, int timeout, String msg) throws Exception {
+        final long tooLate = System.currentTimeMillis() + timeout;
+        while(true) {
+            final String fulfillmentResult = expectation.fulfilled();
+            if (fulfillmentResult==null) {
+                // everything's fine
+                return;
+            }
+            if (System.currentTimeMillis()>tooLate) {
+                fail("expectation not fulfilled within "+timeout+"ms: "+msg+", fulfillment result: "+fulfillmentResult);
+            }
+            Thread.sleep(100);
+        }
+    }
+
+    @Test
+    public void testOneNode() throws Exception {
+        final SimplifiedInstance s1 = createInstance();
+        final ViewExpectation expectation = new ViewExpectation(s1);
+        expectation.setActiveIds(s1.ns.getClusterId());
+        waitFor(expectation, 2000, "see myself as active");
+    }
+
+    @Test
+    public void testTwoNodesWithCleanShutdown() throws Exception {
+        final SimplifiedInstance s1 = createInstance();
+        final SimplifiedInstance s2 = createInstance();
+        final ViewExpectation expectation1 = new ViewExpectation(s1);
+        final ViewExpectation expectation2 = new ViewExpectation(s2);
+        expectation1.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+        expectation2.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+        waitFor(expectation1, 2000, "first should see both as active");
+        waitFor(expectation2, 2000, "second should see both as active");
+        
+        s2.shutdown();
+        final ViewExpectation expectation1AfterShutdown = new ViewExpectation(s1);
+        expectation1AfterShutdown.setActiveIds(s1.ns.getClusterId());
+        expectation1AfterShutdown.setInactiveIds(s2.ns.getClusterId());
+        waitFor(expectation1AfterShutdown, 2000, "first should only see itself after shutdown");
+    }
+
+    @Test
+    public void testTwoNodesWithCrash() throws Throwable {
+        final SimplifiedInstance s1 = createInstance();
+        final SimplifiedInstance s2 = createInstance();
+        final ViewExpectation expectation1 = new ViewExpectation(s1);
+        final ViewExpectation expectation2 = new ViewExpectation(s2);
+        expectation1.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+        expectation2.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+        waitFor(expectation1, 2000, "first should see both as active");
+        waitFor(expectation2, 2000, "second should see both as active");
+        
+        s2.crash();
+        
+        final ViewExpectation expectation1AfterShutdown = new ViewExpectation(s1);
+        expectation1AfterShutdown.setActiveIds(s1.ns.getClusterId());
+        expectation1AfterShutdown.setInactiveIds(s2.ns.getClusterId());
+        waitFor(expectation1AfterShutdown, 2000, "first should only see itself after shutdown");
+    }
+    
+    @Test
+    public void testTwoNodesWithCrashAndLongduringRecovery() throws Throwable {
+        doTestTwoNodesWithCrashAndLongduringDeactivation(false);
+    }
+    
+    @Test
+    public void testTwoNodesWithCrashAndLongduringRecoveryAndBacklog() throws Throwable {
+        doTestTwoNodesWithCrashAndLongduringDeactivation(true);
+    }
+    
+    void doTestTwoNodesWithCrashAndLongduringDeactivation(boolean withBacklog) throws Throwable {
+        final int TEST_WAIT_TIMEOUT = 10000;
+        final SimplifiedInstance s1 = createInstance();
+        final SimplifiedInstance s2 = createInstance();
+        final ViewExpectation expectation1 = new ViewExpectation(s1);
+        final ViewExpectation expectation2 = new ViewExpectation(s2);
+        expectation1.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+        expectation2.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+        waitFor(expectation1, TEST_WAIT_TIMEOUT, "first should see both as active");
+        waitFor(expectation2, TEST_WAIT_TIMEOUT, "second should see both as active");
+        
+        // before crashing s2, make sure that s1's lastRevRecovery thread doesn't run
+        s1.stopLastRevThread();
+        if (withBacklog) {
+            // plus also stop s1's backgroundReadThread - in case we want to test backlog handling
+            s1.stopBgReadThread();
+            
+            // and then, if we want to do backlog testing, then s2 should write something
+            // before it crashes, so here it comes:
+            s2.addNode("/foo/bar");
+            s2.setProperty("/foo/bar", "prop", "value");
+        }
+        
+        // then crash s2
+        s2.crash();
+        
+        // then wait 2 sec
+        Thread.sleep(2000);
+        
+        // at this stage, while s2 has crashed, we have stopped s1's lastRevRecoveryThread, so we should still see both as active
+        logger.info(s1.getClusterViewStr());
+        final ViewExpectation expectation1AfterCrashBeforeLastRevRecovery = new ViewExpectation(s1);
+        expectation1AfterCrashBeforeLastRevRecovery.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+        waitFor(expectation1AfterCrashBeforeLastRevRecovery, TEST_WAIT_TIMEOUT, "first should still see both as active");
+        
+        // the next part is a bit tricky: we want to fine-control the lastRevRecoveryThread's acquire/release locking.
+        // the chosen way to do this is to make heavy use of mockito and two semaphores:
+        // when acquireRecoveryLock is called, that thread should wait for the waitBeforeLocking semaphore to be released
+        final MissingLastRevSeeker missingLastRevUtil = 
+                (MissingLastRevSeeker) PrivateAccessor.getField(s1.ns.getLastRevRecoveryAgent(), "missingLastRevUtil");
+        assertNotNull(missingLastRevUtil);
+        MissingLastRevSeeker mockedLongduringMissingLastRevUtil = mock(MissingLastRevSeeker.class, delegatesTo(missingLastRevUtil));
+        final Semaphore waitBeforeLocking = new Semaphore(0);
+        when(mockedLongduringMissingLastRevUtil.acquireRecoveryLock(anyInt())).then(new Answer<Boolean>() {
+
+            @Override
+            public Boolean answer(InvocationOnMock invocation) throws Throwable {
+                logger.info("going to waitBeforeLocking");
+                waitBeforeLocking.acquire();
+                logger.info("done with waitBeforeLocking");
+                return missingLastRevUtil.acquireRecoveryLock((Integer)invocation.getArguments()[0]);
+            }
+        });
+        PrivateAccessor.setField(s1.ns.getLastRevRecoveryAgent(), "missingLastRevUtil", mockedLongduringMissingLastRevUtil);
+        
+        // so let's start the lastRevThread again and wait for that waitBeforeLocking semaphore to be hit
+        s1.startLastRevThread();
+        waitFor(new Expectation() {
+
+            @Override
+            public String fulfilled() throws Exception {
+                if (!waitBeforeLocking.hasQueuedThreads()) {
+                    return "no thread queued";
+                }
+                return null;
+            }
+            
+        }, TEST_WAIT_TIMEOUT, "lastRevRecoveryThread should acquire a lock");
+        
+        // at this stage the crashed s2 is still not in recovery mode, so let's check:
+        logger.info(s1.getClusterViewStr());
+        final ViewExpectation expectation1AfterCrashBeforeLastRevRecoveryLocking = new ViewExpectation(s1);
+        expectation1AfterCrashBeforeLastRevRecoveryLocking.setActiveIds(s1.ns.getClusterId(), s2.ns.getClusterId());
+        waitFor(expectation1AfterCrashBeforeLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see both as active");
+        
+        // one thing, before we let the waitBeforeLocking go, setup the release semaphore/mock:
+        final Semaphore waitBeforeUnlocking = new Semaphore(0);
+        Mockito.doAnswer(new Answer<Void>() {
+            public Void answer(InvocationOnMock invocation) throws InterruptedException {
+                logger.info("Going to waitBeforeUnlocking");
+                waitBeforeUnlocking.acquire();
+                logger.info("Done with waitBeforeUnlocking");
+                missingLastRevUtil.releaseRecoveryLock((Integer)invocation.getArguments()[0]);
+                return null;
+            }
+        }).when(mockedLongduringMissingLastRevUtil).releaseRecoveryLock(anyInt());
+        
+        // let go (or tschaedere loh)
+        waitBeforeLocking.release();
+        
+        // then, right after we let the waitBeforeLocking semaphore go, we should see s2 in recovery mode
+        final ViewExpectation expectation1AfterCrashWhileLastRevRecoveryLocking = new ViewExpectation(s1);
+        expectation1AfterCrashWhileLastRevRecoveryLocking.setActiveIds(s1.ns.getClusterId());
+        expectation1AfterCrashWhileLastRevRecoveryLocking.setDeactivatingIds(s2.ns.getClusterId());
+        waitFor(expectation1AfterCrashWhileLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see s2 as recovering");
+
+        // ok, meanwhile, the lastRevRecoveryAgent should have hit the ot
+        waitFor(new Expectation() {
+
+            @Override
+            public String fulfilled() throws Exception {
+                if (!waitBeforeUnlocking.hasQueuedThreads()) {
+                    return "no thread queued";
+                }
+                return null;
+            }
+            
+        }, TEST_WAIT_TIMEOUT, "lastRevRecoveryThread should want to release a lock");
+        
+        // so then, we should still see the same state
+        waitFor(expectation1AfterCrashWhileLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see s2 as recovering");
+
+        logger.info("Waiting 1,5sec");
+        Thread.sleep(1500);
+        logger.info("Waiting done");
+        
+        // first, lets check to see what the view looks like - should be unchanged:
+        waitFor(expectation1AfterCrashWhileLastRevRecoveryLocking, TEST_WAIT_TIMEOUT, "first should still see s2 as recovering");
+        
+        // let waitBeforeUnlocking go
+        logger.info("releasing waitBeforeUnlocking, state: "+s1.getClusterViewStr());
+        waitBeforeUnlocking.release();
+        logger.info("released waitBeforeUnlocking");
+        
+        if (!withBacklog) {
+            final ViewExpectation expectationWithoutBacklog = new ViewExpectation(s1);
+            expectationWithoutBacklog.setActiveIds(s1.ns.getClusterId());
+            expectationWithoutBacklog.setInactiveIds(s2.ns.getClusterId());
+            waitFor(expectationWithoutBacklog, TEST_WAIT_TIMEOUT, "finally we should see s2 as completely inactive");
+        } else {
+            // wait just 2 sec to see if the bgReadThread is really stopped
+            logger.info("sleeping 2 sec");
+            Thread.sleep(2000);
+            logger.info("sleeping 2 sec done, state: "+s1.getClusterViewStr());
+
+            // when that's the case, check the view - it should now be in a special 'final=false' mode
+            final ViewExpectation expectationBeforeBgRead = new ViewExpectation(s1);
+            expectationBeforeBgRead.setActiveIds(s1.ns.getClusterId());
+            expectationBeforeBgRead.setDeactivatingIds(s2.ns.getClusterId());
+            expectationBeforeBgRead.setFinal(false);
+            waitFor(expectationBeforeBgRead, TEST_WAIT_TIMEOUT, "first should only see itself after shutdown");
+            
+            // ook, now we explicitly do a background read to get out of the backlog situation
+            s1.ns.runBackgroundReadOperations();
+    
+            final ViewExpectation expectationAfterBgRead = new ViewExpectation(s1);
+            expectationAfterBgRead.setActiveIds(s1.ns.getClusterId());
+            expectationAfterBgRead.setInactiveIds(s2.ns.getClusterId());
+            waitFor(expectationAfterBgRead, TEST_WAIT_TIMEOUT, "finally we should see s2 as completely inactive");
+        }
+    }
+
+    /** This test creates a large number of documentnodestores which it starts, runs, stops
+     * in a random fashion, always testing to make sure the clusterView is correct
+     */
+    @Test
+    public void testLargeStartStopFiesta() throws Throwable {
+        final List<SimplifiedInstance> instances = new LinkedList<SimplifiedInstance>();
+        final Map<Integer,String> inactiveIds = new HashMap<Integer,String>();
+        final Random random = new Random();
+        final int LOOP_CNT = 50; // with too many loops have also seen mongo connections becoming starved thus test failed
+        final int CHECK_EVERY = 3;
+        final int MAX_NUM_INSTANCES = 8;
+        for(int i=0; i<LOOP_CNT; i++) {
+            if (i%CHECK_EVERY==0) {
+                checkFiestaState(instances, inactiveIds.keySet());
+            }
+            final int nextInt = random.nextInt(5);
+//            logger.info("testLargeStartStopFiesta: iteration "+i+" with case "+nextInt);
+            String workingDir = UUID.randomUUID().toString();
+            switch(nextInt) {
+            case 0: {
+                // increase likelihood of creating instances..
+                // but reuse an inactive one if possible
+                if (inactiveIds.size()>0) {
+                    logger.info("Case 0 - reactivating an instance...");
+                    final int n = random.nextInt(inactiveIds.size());
+                    final Integer cid = new LinkedList<Integer>(inactiveIds.keySet()).get(n);
+                    final String reactivatedWorkingDir = inactiveIds.remove(cid);
+                    if (reactivatedWorkingDir==null) {
+                        fail("reactivatedWorkingDir null for n="+n+", cid="+cid+", other inactives: "+inactiveIds);
+                    }
+                    assertNotNull(reactivatedWorkingDir);
+                    logger.info("Case 0 - reactivated instance "+cid+", workingDir="+reactivatedWorkingDir);
+                    workingDir = reactivatedWorkingDir;
+                    logger.info("Case 0: creating instance");
+                    final SimplifiedInstance newInstance = createInstance(workingDir);
+                    newInstance.setLeastTimeout(5000);
+                    newInstance.startSimulatingWrites(500);
+                    logger.info("Case 0: created instance: "+newInstance.ns.getClusterId());
+                    if (newInstance.ns.getClusterId()!=cid) {
+                        logger.info("Case 0: reactivated instance did not take over cid - probably a testing artifact. expected cid: {}, actual cid: {}", cid, newInstance.ns.getClusterId());
+                        inactiveIds.put(cid,  reactivatedWorkingDir);
+                        // remove the newly reactivated from the inactives - although it shouldn't be there, it might!
+                        inactiveIds.remove(newInstance.ns.getClusterId());
+                    }
+                    instances.add(newInstance);
+                }
+                break;
+            }
+            case 1: {
+                // creates a new instance
+                if (instances.size()<MAX_NUM_INSTANCES) {
+                    logger.info("Case 1: creating instance");
+                    final SimplifiedInstance newInstance = createInstance(workingDir);
+                    newInstance.setLeastTimeout(5000);
+                    newInstance.startSimulatingWrites(500);
+                    logger.info("Case 1: created instance: "+newInstance.ns.getClusterId());
+                    instances.add(newInstance);
+                }
+                break;
+            }
+            case 2: {
+                // do nothing
+                break;
+            }
+            case 3: {
+                // shutdown instance
+                if (instances.size()>1) {
+                    // before shutting down: make sure we have a stable view (we could otherwise not correctly startup too)
+                    checkFiestaState(instances, inactiveIds.keySet());
+                    final SimplifiedInstance instance = instances.remove(random.nextInt(instances.size()));
+                    assertNotNull(instance.workingDir);
+                    logger.info("Case 3: Shutdown instance: "+instance.ns.getClusterId());
+                    inactiveIds.put(instance.ns.getClusterId(), instance.workingDir);
+                    instance.shutdown();
+                }
+                break;
+            }
+            case 4: {
+                // crash instance
+                if (instances.size()>1) {
+                    // before crashing make sure we have a stable view (we could otherwise not correctly startup too)
+                    checkFiestaState(instances, inactiveIds.keySet());
+                    final SimplifiedInstance instance = instances.remove(random.nextInt(instances.size()));
+                    assertNotNull(instance.workingDir);
+                    logger.info("Case 4: Crashing instance: "+instance.ns.getClusterId());
+                    inactiveIds.put(instance.ns.getClusterId(), instance.workingDir);
+                    instance.addNode("/"+instance.ns.getClusterId()+"/stuffForRecovery/"+random.nextInt(10000));
+                    instance.crash();
+                }
+                break;
+            }
+            }
+        }
+    }
+    
+    private void dumpChildren(DocumentNodeState root) {
+        logger.info("testEmptyParentRecovery: root: "+root);
+        Iterator<String> it = root.getChildNodeNames().iterator();
+        while(it.hasNext()) {
+            String n = it.next();
+            logger.info("testEmptyParentRecovery: a child: '"+n+"'");
+        }
+    }
+    
+    private void checkFiestaState(final List<SimplifiedInstance> instances, Set<Integer> inactiveIds) throws Exception {
+        final List<Integer> activeIds = new LinkedList<Integer>();
+        for (Iterator<SimplifiedInstance> it = instances.iterator(); it.hasNext();) {
+            SimplifiedInstance anInstance = it.next();
+            activeIds.add(anInstance.ns.getClusterId());
+        }
+        for (Iterator<SimplifiedInstance> it = instances.iterator(); it.hasNext();) {
+            SimplifiedInstance anInstance = it.next();
+            
+            final ViewExpectation e = new ViewExpectation(anInstance);
+            e.setActiveIds(activeIds.toArray(new Integer[activeIds.size()]));
+            e.setInactiveIds(inactiveIds.toArray(new Integer[inactiveIds.size()]));
+            waitFor(e, 20000, "checkFiestaState failed for "+anInstance+", with instances: "+instances+", and inactiveIds: "+inactiveIds);
+        }
+    }
+
+    @Before
+    @After
+    public void clear() {
+        for(SimplifiedInstance i : allInstances) {
+            i.dispose();
+        }
+        for (DocumentMK mk : mks) {
+            mk.dispose();
+        }
+        mks.clear();
+        if (MONGO_DB) {
+            DB db = MongoUtils.getConnection().getDB();
+            MongoUtils.dropCollections(db);
+        }
+    }
+
+    private DocumentMK createMK(int clusterId, int asyncDelay) {
+        if (MONGO_DB) {
+            DB db = MongoUtils.getConnection().getDB();
+            return register(new DocumentMK.Builder().setMongoDB(db)
+                    .setClusterId(clusterId).setAsyncDelay(asyncDelay).open());
+        } else {
+            if (ds == null) {
+                ds = new MemoryDocumentStore();
+            }
+            if (bs == null) {
+                bs = new MemoryBlobStore();
+            }
+            return createMK(clusterId, asyncDelay, ds, bs);
+        }
+    }
+
+    private DocumentMK createMK(int clusterId, int asyncDelay,
+                             DocumentStore ds, BlobStore bs) {
+        return register(new DocumentMK.Builder().setDocumentStore(ds)
+                .setBlobStore(bs).setClusterId(clusterId)
+                .setAsyncDelay(asyncDelay).open());
+    }
+
+    private DocumentMK register(DocumentMK mk) {
+        mks.add(mk);
+        return mk;
+    }
+
+}
