diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/ClusterView.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/ClusterView.java
new file mode 100644
index 0000000..7eebd4e
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/ClusterView.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.discoverylite;
+
+import java.util.Set;
+
+import aQute.bnd.annotation.ConsumerType;
+
+/**
+ * A ClusterView represents the state of the cluster at a particular 
+ * point in time.
+ * <p>
+ * A cluster here refers to all instances writing heartbeats to the same mongo
+ * collection.
+ * <p>
+ * A ClusterView carries a unique id that identifies this particular
+ * incarnation of ClusterView throughout the cluster - ie all instances
+ * in the cluster (which are represented in the clusterInstances set)
+ * refer to this ClusterView via the very same id - thus this id
+ * is valid and unique in this cluster.
+ * <p>
+ * A ClusterViewChangeListener can be used to get notifications when the state
+ * of the cluster changes - thus a new incarnation of ClusterEvent is sent.
+ */
+@ConsumerType
+public interface ClusterView {
+    
+    /**
+     * Each incarnation of the state of the cluster (represented by this
+     * ClusterView) carries a new, unique-but-clusterwide-known view id.
+     * @return the id that uniquely identifies this particular ClusterView
+     */
+    public String getId();
+    
+
+    /**
+     * The id that the local instance has been given - note that this
+     * id is not persisted but instead changes on each osgi activation
+     * of the containing service.
+     * <p>
+     * This id corresponds to InstanceStateChangeListener.handleLocalOakRuntimeInstanceId(id)
+     * @return the (runtime, non-persisted) id of the local instance
+     * @see InstanceStateChangeListener#handleLocalOakRuntimeInstanceId(String)
+     */
+    public String getLocalOakRuntimeInstanceId();
+    
+    /**
+     * Returns the list of instance ids that are currently active
+     * in this clusterView
+     * @return the list of instance ids that are currently active
+     * in this clusterView
+     */
+    public Set<String> getActiveClusterInstanceIds();
+    
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/ClusterViewChangeListener.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/ClusterViewChangeListener.java
new file mode 100644
index 0000000..0a4ee29
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/ClusterViewChangeListener.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.discoverylite;
+
+import aQute.bnd.annotation.ConsumerType;
+
+/**
+ * A ClusterViewChangeListener is informed about every change of the
+ * active ClusterView.
+ * <p>
+ * That is, when the underlying heartbeat mechanism detects that an instance
+ * joined or left the cluster, a new viewId is declared and stored in 
+ * mongo and subsequently announced to all instances in the cluster via
+ * this listener mechanism
+ */
+@ConsumerType
+public interface ClusterViewChangeListener {
+
+    /**
+     * Informs about a change of the active ClusterView
+     * @param newView the newly valid ClusterView
+     */
+    public void handleClusterViewChange(ClusterView newView);
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/InstanceStateChangeListener.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/InstanceStateChangeListener.java
new file mode 100644
index 0000000..2a23ea1
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/InstanceStateChangeListener.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.discoverylite;
+
+/**
+ * An InstanceStateChangeListener is informed about coming and leaving
+ * instances in the cluster.
+ * <p>
+ * Additionally, when an instance becomes inactive, two states
+ * are differentiated: whether the instance has a backlog that
+ * is pending to be processed or otherwise if the backlog 
+ * was processed.
+ */
+public interface InstanceStateChangeListener {
+
+    /**
+     * An instance is in one of the states listed here
+     * @see InstanceStateChangeListener#handleInstanceStateChange(String, State)
+     */
+    public static enum State {
+        ACTIVE,
+        INACTIVE_WITH_BACKLOG,
+        INACTIVE_NO_BACKLOG
+    }
+    
+    /**
+     * Informs about an instance state change plus
+     * details what the new state is.
+     * <p>
+     * The oakRuntimeInstanceId is the non-persisted, valid-at-runtime-only 
+     * id that another instance hooked to the same oak repository was assigned.
+     * Higherlevel code must map this instance to whatever other eg persistend id
+     * it wants. Typical use case would be that at startup, the localOakRuntimeInstanceId
+     * is mapped to a slingId via the repository and that thus everybody else can
+     * learn about this mapping in the same cluster too.
+     * <p>
+     * Note on the states:
+     * <ul>
+     *  <li>ACTIVE: indicates that this instance became active, ie joined the cluster</li>
+     *  <li>INACTIVE_WITH_BACKLOG: indicates that this instance became inactive but
+     *  oak still has a backlog of changes of that instance yet to be processed.
+     *  If this state is flagged, it will be followed with an INACTIVE_NO_BACKLOG
+     *  as soon as possible</li>
+     *  <li>INACTIVE_NO_BACKLOG: indicates that this instance became inactive
+     *  and has no more backlog. This is typically the last state an instance
+     *  has - until it potentially is restarted/rejoins</li>
+     * </ul>
+     * @param oakRuntimeInstanceId the id of the instance which's state has just changed
+     * @param newState the new state of this instance
+     */
+    public void handleInstanceStateChange(String oakRuntimeInstanceId, State newState);
+
+    /**
+     * Informs about the id of the local instance. Note that this id 
+     * is not persisted, hence only valid at runtime and can change
+     * from oak start to oak start. Note however that it *could* be the same
+     * for subsequent oak starts, it's just not guaranteed to be.
+     * @param localOakRuntimeInstanceId the id of the local instance
+     */
+    public void handleLocalOakRuntimeInstanceId(String localOakRuntimeInstanceId);
+    
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/document/ClusterViewImpl.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/document/ClusterViewImpl.java
new file mode 100644
index 0000000..77b9651
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/document/ClusterViewImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.discoverylite.document;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.plugins.discoverylite.ClusterView;
+import org.apache.jackrabbit.oak.plugins.document.ClusterViewDocument;
+
+/** Default implementation of ClusterView that comes along with a nice toString() **/
+class ClusterViewImpl implements ClusterView {
+
+    private final Set<String> activeClusterInstanceIds;
+    private final String localOakRuntimeInstanceId;
+    private final String viewId;
+
+    public ClusterViewImpl(Set<String> activeClusterInstanceIds, String localOakRuntimeInstanceId, String viewId) {
+        this.activeClusterInstanceIds = Collections.unmodifiableSet(activeClusterInstanceIds);
+        this.localOakRuntimeInstanceId = localOakRuntimeInstanceId;
+        this.viewId = viewId;
+    }
+    
+    @Override
+    public String toString() {
+        return "ClusterEventImpl[viewId="+viewId+
+                ", localOakRuntimeInstanceId="+localOakRuntimeInstanceId+
+                ", activeClusterInstanceIds="+activeClusterInstanceIds+
+                "]";
+    }
+    
+    @Override
+    public String getLocalOakRuntimeInstanceId() {
+        return localOakRuntimeInstanceId;
+    }
+
+    @Override
+    public Set<String> getActiveClusterInstanceIds() {
+        return activeClusterInstanceIds;
+    }
+
+    @Override
+    public String getId() {
+        return viewId;
+    }
+
+    static ClusterViewImpl fromDocument(String localOakRuntimeInstanceId, ClusterViewDocument newViewOrNull) {
+        Set<String> memberIds = new HashSet<String>();
+        for (Iterator<Integer> it = newViewOrNull.getMemberIds().iterator(); it.hasNext();) {
+            Integer id = it.next();
+            memberIds.add(String.valueOf(id.intValue()));
+        }
+        //TODO: should ClusterView.getId() return an int rather than a String? - would be more restrictive though
+        return new ClusterViewImpl(memberIds , localOakRuntimeInstanceId, String.valueOf(newViewOrNull.getViewId()));
+    }
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/document/DiscoveryLiteListener.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/document/DiscoveryLiteListener.java
new file mode 100644
index 0000000..24f20f5
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/document/DiscoveryLiteListener.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.discoverylite.document;
+
+/**
+ * Small listener-API between DocumentNodeStore and DocumentDiscoveryLiteService.
+ * <p>
+ * The DocumentDiscoveryLiteService must be informed whenever the DocumentNodeStore
+ * has detected a change in the clusterNodes collection (active/inactive/timed out/recovering)
+ * - and that is signalled via hanleClusterStateChange().
+ * <p>
+ * Additionally the DocumentDiscoveryLiteService must be notified when the
+ * background-read has finished - as it could be waiting for a crashed node's recovery
+ * to finish - which it can only do by checking the lastKnownRevision of the crashed
+ * instance - and that check is best done after the background read is just finished
+ * (it could optinoally do that just purely time based as well, but going via a listener
+ * is more timely, that's why this approach has been chosen).
+ */
+public interface DiscoveryLiteListener {
+
+    /**
+     * Informs the listener that DocumentNodeStore has discovered a change in the
+     * clusterNodes collection.
+     */
+    public void handleClusterStateChange();
+    
+    /**
+     * Informs the listener that DocumentNodeStore has just finished another round
+     * of background-read.
+     */
+    public void handleBackgroundReadOperationDone();
+    
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/document/DocumentDiscoveryLiteService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/document/DocumentDiscoveryLiteService.java
new file mode 100644
index 0000000..86a80d5
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/discoverylite/document/DocumentDiscoveryLiteService.java
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.discoverylite.document;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.jackrabbit.oak.plugins.discoverylite.ClusterViewChangeListener;
+import org.apache.jackrabbit.oak.plugins.discoverylite.InstanceStateChangeListener;
+import org.apache.jackrabbit.oak.plugins.discoverylite.InstanceStateChangeListener.State;
+import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument;
+import org.apache.jackrabbit.oak.plugins.document.ClusterViewDocument;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.Revision;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The DocumentDiscoveryLiteService serves ClusterViewChangeListeners
+ * as well as InstanceStateChangeListeners, informing both of them
+ * when either the clusterview has changed or an instance has changed
+ * its state between active/inactive_with_backlog/inactive_no_backlog.
+ * <p>
+ * The DocumentDiscoveryListService uses DocumentNodeStore's clusterNodes
+ * collection as kind of the heartbeat-replacement mechanism. Ie the
+ * leases now become 'heartbeats too' (from a discovery-lite point of view
+ * at least).
+ * <p>
+ * The cluster is further derived from the active-and-not-recovering nodes
+ * and is explicitly persisted in the settings collection in a document
+ * with id 'clusterView'. That document both contains the current truth
+ * about what the clusterview looks like (ie about which instances
+ * are currently considered actively taking part in the cluster) as well
+ * as it contains a history for debugging purpose.
+ * <p>
+ * The reason for storing the clusterView explicitly and not solely 
+ * rely on the source-data - which is the clusterNodes - is to make
+ * the clusterView something more robust: by explicitly storing it, 
+ * there is no rare edge cases possibility where one instance
+ * interprets the clusterNodes active state slightly different
+ * than the other. Also, storing the clusterView allows to add
+ * properties to 'an incarnation of the clusterView document' - that is:
+ * it allows to store a clusterViewId alongside with it. That can be
+ * used by upper layers (via the ClusterViewChangeListener / ClusterView)
+ * to identify the current clusterView precisely. Without an id that
+ * would not be possible.
+ * <p>
+ * Prerequisites that the clusterView mechanism is stable:
+ * <ul>
+ *  <li>the machine clocks are reasonably in sync - that is, they should
+ *  be off by magnitudes less than the lease updateFrequency/timeout</li>
+ *  <li>the write-delays from any instance to the mongo server where
+ *  the clusterNodes and settings collections are stored should be
+ *  very fast - at least orders of magnitudes lower again than the
+ *  lease timeout</li>
+ *  <li>when this instance notices that others have kicked it out of the
+ *  clusterView (which it can find out when either its clusterNodes
+ *  document is set to recovering or it is not in the clusterView
+ *  anymore, although it just was - ie not just because of a fresh start),
+ *  then this instance must step back gracefully. 
+ *  The exact definition is to be applied elsewhere - but it should
+ *  include: stopping to update its own lease, waiting for the view
+ *  to have stabilized - waiting for recovery of its own instance
+ *  by the remaining instances in the cluster to have finished -
+ *  and then probably waiting for another gracePeriod until it
+ *  might rejoin the cluster. In between, any commit should
+ *  fail with BannedFromClusterException</li>
+ * </ul>
+ */
+@Component(immediate = true)
+@Service(value = { DocumentDiscoveryLiteService.class })
+public class DocumentDiscoveryLiteService implements DiscoveryLiteListener {
+
+    final Logger logger = LoggerFactory.getLogger(this.getClass());
+    
+    private static enum WakeupReason {
+        CLUSTER_STATE_CHANGED,
+        BACKGROUND_READ_FINISHED
+    }
+    
+    private class BackgroundWorker implements Runnable {
+
+        final Random random = new Random();
+        
+        @Override
+        public void run() {
+            logger.info("BackgroundWorker.run: start");
+            try{
+                doRun();
+            } finally {
+                logger.info("BackgroundWorker.run: end (finally)");
+            }
+        }
+
+        private void doRun() {
+            while(true) { // should always run
+                try{
+                    logger.trace("BackgroundWorker.doRun: going to call checkView");
+                    final boolean shortSleep = !checkView();
+                    logger.trace("BackgroundWorker.doRun: checkView terminated with {}", shortSleep);
+                    final long sleepMillis = shortSleep ? (50 + random.nextInt(450)) : 5000;
+                    logger.trace("BackgroundWorker.doRun: sleeping {}ms", sleepMillis);
+                    synchronized(BackgroundWorker.this) {
+                        BackgroundWorker.this.wait(sleepMillis);
+                    }
+                    logger.trace("BackgorundWorker.doRun: done sleeping, looping");
+                } catch(Exception e) {
+                    logger.error("doRun: got an exception: "+e, e);
+                    try{
+                        Thread.sleep(5000);
+                    } catch(Exception e2) {
+                        logger.error("doRun: got an exception while sleeping due to another exception: "+e2, e2);
+                    }
+                }
+            }
+        }
+        
+    }
+    
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, policy = ReferencePolicy.DYNAMIC, referenceInterface = ClusterViewChangeListener.class)
+    private ClusterViewChangeListener[] clusterViewChangeListeners = new ClusterViewChangeListener[0];
+    
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, policy = ReferencePolicy.DYNAMIC, referenceInterface = InstanceStateChangeListener.class)
+    private InstanceStateChangeListener[] instanceStateChangeListeners = new InstanceStateChangeListener[0];
+    
+    /** object used for synchronizing actions on the eventListeners array **/
+    private final Object lock = new Object();
+
+    /** the last clusterview that was sent to listeners **/
+    private ClusterViewImpl lastClusterViewSent;
+    
+    /** 
+     * the runtime, non-persisted id of the local instance. must not be currentView/CURRENT_VIEW_OBJECT_ID
+     * @see HeartbeatProcessor#CURRENT_VIEW_OBJECT_ID
+     */
+    private final String localOakRuntimeInstanceId = UUID.randomUUID().toString();
+
+    /** DocumentNodeStore's (hence local) clusterId **/
+    private int documentNodeStoreClusterId = -1;
+
+    /** the DocumentNodeStore - used to get the active/inactive cluster ids from **/
+    private DocumentNodeStore documentNodeStore;
+
+    private ClusterViewDocument previousClusterView;
+
+    private Set<Integer> inactiveInstancesWithoutBacklog = new HashSet<Integer>();
+    private Set<Integer> inactiveInstancesWithBacklog = new ConcurrentSkipListSet<Integer>();
+
+    private BackgroundWorker backgroundWorker;
+
+    private boolean activated;
+    
+    private boolean initialized;
+
+    /**
+     * Injects the DocumentNodeStore to this MongoDiscoveryLiteService for further necessary setup steps
+     * <p>
+     * The reason for going this unorthodox (read: non-osgi-like) path is: the DocumentNodeStore 
+     * is not an osgi service
+     */
+    public void injectDocumentNodeStore(DocumentNodeStore documentNodeStore) {
+        documentNodeStore.setDiscoveryLiteListener(this);
+        synchronized(lock) {
+            this.documentNodeStore = documentNodeStore;
+            documentNodeStoreClusterId = documentNodeStore.getClusterId();
+            init();
+        }
+    }
+
+    /** On activate the MongoDiscoveryService tries to start the background job */
+    public void activate() {
+        logger.trace("activate: start");
+        synchronized(lock) {
+            activated = true;
+            init();
+        }
+        logger.trace("activate: end");
+    }
+    
+    /** tries to start the background job - if all relevant parts are set **/
+    private void init() {
+        logger.trace("init: start");
+        if (initialized) {
+            logger.debug("init: already initialized");
+            return;
+        }
+        if (!activated) {
+            logger.debug("init: cannot initialize yet, not yet activated");
+            return;
+        }
+        if (documentNodeStoreClusterId==-1) {
+            logger.debug("init: cannot initialize yet, documentNodeStoreClusterId not yet set (via inject)");
+            return;
+        }
+        logger.info("init: initializing now. clusterId: {}", documentNodeStoreClusterId);
+        
+        backgroundWorker = new BackgroundWorker();
+        Thread th = new Thread(backgroundWorker);
+        th.setDaemon(true);
+        th.start();
+        
+        initialized = true;
+        
+        logger.trace("init: end");
+    }
+    
+    /** On deactivate the background job is stopped - if it was running at all **/
+    protected void deactivate() {
+        logger.trace("deactivate: deactivated");
+
+        synchronized(lock) {
+            activated = false;
+            initialized = false;
+        }
+        logger.trace("deactivate: end");
+    }
+    
+    /**
+     * bind an instance state-change listener
+     */
+    protected void bindInstanceStateChangeListener(final InstanceStateChangeListener listener) {
+        logger.trace("bindInstanceStateChangeListener: start. listener: {}", listener);
+        synchronized (lock) {
+            final List<InstanceStateChangeListener> currentList = new ArrayList<InstanceStateChangeListener>(
+                    Arrays.asList(instanceStateChangeListeners));
+            currentList.add(listener);
+            this.instanceStateChangeListeners = currentList
+                    .toArray(new InstanceStateChangeListener[currentList.size()]);
+        }
+        listener.handleLocalOakRuntimeInstanceId(localOakRuntimeInstanceId);
+        logger.trace("bindInstanceStateChangeListener: end");
+    }
+    
+    /**
+     * unbind an instance state-change listener
+     */
+    protected void unbindInstanceStateChangeListener(final InstanceStateChangeListener listener) {
+        logger.trace("unbindInstanceStateChangeListener: start. listener: {}", listener);
+        synchronized (lock) {
+            final List<InstanceStateChangeListener> currentList = new ArrayList<InstanceStateChangeListener>(
+                    Arrays.asList(instanceStateChangeListeners));
+            currentList.remove(listener);
+            this.instanceStateChangeListeners = currentList
+                    .toArray(new InstanceStateChangeListener[currentList.size()]);
+        }
+        logger.trace("unbindInstanceStateChangeListener: end");
+    }
+    
+    /**
+     * bind a cluster event listener
+     */
+    public void bindClusterViewChangeListener(final ClusterViewChangeListener listener) {
+        logger.trace("bindClusterViewChangeListener: start. listener: {}", listener);
+        synchronized (lock) {
+            final List<ClusterViewChangeListener> currentList = new ArrayList<ClusterViewChangeListener>(
+                    Arrays.asList(clusterViewChangeListeners));
+            currentList.add(listener);
+            this.clusterViewChangeListeners = currentList
+                    .toArray(new ClusterViewChangeListener[currentList.size()]);
+        }
+        if (lastClusterViewSent!=null) {
+            try{
+                logger.debug("bindClusterViewChangeListener: sending event to listener: {} (view: {})", listener, lastClusterViewSent);
+                listener.handleClusterViewChange(lastClusterViewSent);
+                logger.debug("bindClusterViewChangeListener: sending event to listener done");
+            } catch(RuntimeException re) {
+                logger.warn("bindClusterViewChangeListener: listener threw RuntimeException: "+re, re);
+            }
+        }
+        logger.trace("bindClusterViewChangeListener: end");
+    }
+    
+    /**
+     * unbind a cluster event listener
+     */
+    public void unbindClusterViewChangeListener(final ClusterViewChangeListener listener) {
+        logger.trace("unbindClusterViewChangeListener: start. eventListener: {}", listener);
+        synchronized (lock) {
+            final List<ClusterViewChangeListener> currentList = new ArrayList<ClusterViewChangeListener>(
+                    Arrays.asList(clusterViewChangeListeners));
+            currentList.remove(listener);
+            this.clusterViewChangeListeners = currentList
+                    .toArray(new ClusterViewChangeListener[currentList.size()]);
+        }
+        logger.trace("unbindClusterViewChangeListener: end");
+    }
+    
+    private boolean checkView() {
+        logger.trace("checkView: start");
+        List<ClusterNodeInfoDocument> allClusterNodes = 
+                ClusterNodeInfoDocument.all(documentNodeStore.getDocumentStore());
+        
+        Map<Integer,ClusterNodeInfoDocument> allNodeIds = new HashMap<Integer,ClusterNodeInfoDocument>();
+        Map<Integer,ClusterNodeInfoDocument> activeNodes = new HashMap<Integer,ClusterNodeInfoDocument>();
+        Map<Integer,ClusterNodeInfoDocument> timedOutNodes = new HashMap<Integer,ClusterNodeInfoDocument>();
+        Map<Integer,ClusterNodeInfoDocument> recoveringNodes = new HashMap<Integer,ClusterNodeInfoDocument>();
+        Map<Integer,ClusterNodeInfoDocument> inactiveNodes = new HashMap<Integer,ClusterNodeInfoDocument>();
+        
+        for (Iterator<ClusterNodeInfoDocument> it = allClusterNodes.iterator(); it.hasNext();) {
+            ClusterNodeInfoDocument clusterNode = it.next();
+            allNodeIds.put(clusterNode.getClusterId(), clusterNode);
+            if (clusterNode.isBeingRecovered()) {
+                recoveringNodes.put(clusterNode.getClusterId(), clusterNode);
+            } else if (!clusterNode.isActive()) {
+                inactiveNodes.put(clusterNode.getClusterId(), clusterNode);
+            } else if (clusterNode.getLeaseEndTime()<System.currentTimeMillis()) {
+                timedOutNodes.put(clusterNode.getClusterId(), clusterNode);
+            } else {
+                activeNodes.put(clusterNode.getClusterId(), clusterNode);
+            }
+        }
+        // protect against changes of these maps by making them immutable:
+        allNodeIds = Collections.unmodifiableMap(allNodeIds);
+        activeNodes = Collections.unmodifiableMap(activeNodes);
+        timedOutNodes = Collections.unmodifiableMap(timedOutNodes);
+        recoveringNodes = Collections.unmodifiableMap(recoveringNodes);
+        inactiveNodes = Collections.unmodifiableMap(inactiveNodes);
+        
+        // let's calculate the nodes that should make up the current view:
+        // it should contain all the active nodes - plus those that are
+        // active but timed out and not yet recovered/flagged-inactive.
+        // (reason for including the timedout: they will yet have to 
+        // switch to recovering or inactive - but we DONT KNOW yet.. that's
+        // predicting the future - so so far we have to stick with 
+        // including them in the view)
+        final HashMap<Integer, ClusterNodeInfoDocument> m = new HashMap<Integer,ClusterNodeInfoDocument>(activeNodes);
+        m.putAll(timedOutNodes);
+        final Map<Integer, ClusterNodeInfoDocument> viewNodes = Collections.unmodifiableMap(m);
+        
+        logger.debug("checkView: active nodes: {}, timed out nodes: {}, recovering nodes: {}, inactive nodes: {}, total: {}, hence view nodes: {}",
+                activeNodes.size(), timedOutNodes.size(), recoveringNodes.size(), inactiveNodes.size(), allNodeIds.size(), viewNodes.size());
+        
+        //next step: besides updating the clusterView, also serve
+        //the instancechangelisteners ...
+        //kind of orthogonal - so maybe in a different method
+        //to which for both checkView and that one the parameter
+        //is the set of allClusterNodes ...
+
+        
+        final ClusterViewDocument originalView = previousClusterView;
+        final ClusterViewDocument newView = doCheckView(viewNodes);
+        logger.debug("checkView: viewFine: {}, originalView: {}, previousClusterView: {}, newView: {}", newView!=null, originalView, previousClusterView, newView);
+        
+        if (newView!=null) {
+            // make sure we move all instances that we had marked as inactive to active (ie remove from there)
+            Set<Integer> newlyActive = new HashSet<Integer>();
+            newlyActive.addAll(newView.getMemberIds());
+            newlyActive.retainAll(inactiveInstancesWithBacklog);
+            if (newlyActive.size()>0) {
+                logger.info("checkView: following instances were marked inactive_with_backlog and are now back ACTIVE: {}", newlyActive);
+                inactiveInstancesWithBacklog.removeAll(newlyActive);
+                sendInstancesStateChange(newlyActive, State.ACTIVE);
+            } else {
+                logger.debug("checkView: no inactive_with_backlog became active at the moment.");
+            }
+            newlyActive.clear();;
+            newlyActive.addAll(newView.getMemberIds());
+            newlyActive.retainAll(inactiveInstancesWithoutBacklog);
+            if (newlyActive.size()>0) {
+                logger.info("checkView: following instances were marked inactive_without_backlog and are now back ACTIVE: {}", newlyActive);
+                inactiveInstancesWithoutBacklog.removeAll(newlyActive);
+                sendInstancesStateChange(newlyActive, State.ACTIVE);
+            } else {
+                logger.debug("checkView: no inactive_without_backlog became active at the moment.");
+            }
+        }
+        
+        // the algo is as follows:
+        // 1 all instances that are in recovering state are added to inactiveInstancesWithBacklog if they are not there already
+        // 2 any time an instance is added to inactiveInstancesWithBacklog it triggers an event to be sent to listeners
+        // 3 all instances that are in inactive state are added to inactiveInstancesWithBacklog if they are not there already
+        // 4 all instances that are in inactive state are then checked if this instance has seen all that instance has last written
+        // 4b once this instance has seen all changes of an inactive instance, it is put to inactiveInstancesWithoutBacklog if not there already
+        // 5 any time an instance is added to inactiveInstancesWithoutBacklog it triggers an event to be sent to listeners
+        
+        logger.debug("checkView: starting with: inactive instances with backlog: {}, inactive instances without backlog: {}",
+                inactiveInstancesWithBacklog, inactiveInstancesWithoutBacklog);
+
+        // 1
+        Set<Integer> newlyRecoveringNodeIds = new HashSet<Integer>();
+        newlyRecoveringNodeIds.addAll(recoveringNodes.keySet());
+        newlyRecoveringNodeIds.removeAll(inactiveInstancesWithBacklog);
+        
+        if (newlyRecoveringNodeIds.size()==0) {
+            logger.debug("checkView: no newly recovering nodes at the moment.");
+        } else {
+            // 2
+            // for all those an event 'INACTIVE_WITH_BACKLOG' must be sent
+            logger.info("checkView: we have the following newly recovering node ids: {}", newlyRecoveringNodeIds);
+            sendInstancesStateChange(newlyRecoveringNodeIds, InstanceStateChangeListener.State.INACTIVE_WITH_BACKLOG);
+            
+            // once that's done, add those to the list of instances that had a backlog and for which
+            // the above event was sent (this time or earlier)
+            addInactiveWithBacklog(newlyRecoveringNodeIds);
+        }
+
+        // 3
+        Set<Integer> newlyBacklogfreeCandidates = new HashSet<Integer>();
+        newlyBacklogfreeCandidates.addAll(inactiveNodes.keySet());
+        newlyBacklogfreeCandidates.addAll(inactiveInstancesWithBacklog);
+        newlyBacklogfreeCandidates.removeAll(inactiveInstancesWithoutBacklog);
+        if (newlyBacklogfreeCandidates.size()==0) {
+            logger.debug("checkView: no new candidates without backlog at the moment");
+        } else {
+            logger.debug("checkView: new candidates without backlog: #: {}, list: {}", newlyBacklogfreeCandidates.size(), newlyBacklogfreeCandidates);
+            
+            // check for all in instancesWithoutBacklog check if we have read all their
+            // changes - that is, if the lastKnownRevision of that covers the one that is
+            // stored in clusterNodes=>_id:ultimateRootRev
+            String[] lastKnownRevisions = documentNodeStore.getMBean().getLastKnownRevisions();
+            Map<Integer,String> lastKnownRevisionMap = new HashMap<Integer,String>();
+            for (int i = 0; i < lastKnownRevisions.length; i++) {
+                String aLastKnownRevisionStr = lastKnownRevisions[i];
+                String[] split = aLastKnownRevisionStr.split("=");
+                if (split.length==2) {
+                    try{
+                        Integer id = Integer.parseInt(split[0]);
+                        lastKnownRevisionMap.put(id, split[1]);
+                    } catch(NumberFormatException nfe) {
+                        logger.warn("checkView: could not parse integer '"+split[0]+"': "+nfe, nfe);
+                    }
+                } else {
+                    logger.warn("checkView: cannot parse lastKnownRevision: "+aLastKnownRevisionStr);
+                }
+            }
+            logger.debug("checkView: newlyBacklogfreeNodeIds.size: {}", newlyBacklogfreeCandidates.size());
+            Set<Integer> newlyBacklogfreeNodeIds = new HashSet<Integer>();
+            Set<Integer> instancesStillHavingBacklog = new HashSet<Integer>();
+            for (Iterator<Integer> it = newlyBacklogfreeCandidates.iterator(); it
+                    .hasNext();) {
+                final Integer aNodeId = it.next();
+                final String lastKnownRevision = lastKnownRevisionMap.get(aNodeId);
+                if (lastKnownRevision==null) {
+                    logger.warn("checkView: could not find lastKnownRevision for clusterNodeId="+aNodeId+", treading as in sync.");
+                } else {
+                    final ClusterNodeInfoDocument clusterNode = allNodeIds.get(aNodeId);
+                    if (clusterNode!=null) {
+                        final String ultimateRootRevStr = clusterNode.getLastBackgroundWrittenRootRev();
+                        if (ultimateRootRevStr==null) {
+                            logger.info("checkView: no indication about what the other instance has last written. Treating as synched.");
+                            newlyBacklogfreeNodeIds.add(aNodeId);
+                        } else {
+                            final Revision ultimateRootRev = Revision.fromString(ultimateRootRevStr);
+                            final Revision lastKnownRev = Revision.fromString(lastKnownRevision);
+                            if (Revision.getTimestampDifference(lastKnownRev, ultimateRootRev)>=0) {
+                                // then we have fully seen this instance
+                                logger.info("checkView: we have fully seen all changes by instance {}, lastKnownRev: {}, ultimateRootRev: {}", aNodeId, lastKnownRevision, ultimateRootRevStr);
+                                newlyBacklogfreeNodeIds.add(aNodeId);
+                            } else {
+                                // not yet seen it all
+                                logger.debug("checkView: we have not yet seen all changes by instance {}, lastKnownRev: {}, ultimateRootRev: {}", aNodeId, lastKnownRevision, ultimateRootRevStr);
+                                instancesStillHavingBacklog.add(aNodeId);
+                            }
+                        }
+                    }
+                }
+            }
+            
+            // as a result, we now have a list of nodeIds that still have a backlog - let's add those to the inactiveInstancesWithBacklog for future processing
+            addInactiveWithBacklog(instancesStillHavingBacklog);
+            
+            // and the other list is the ones that just became 'backlog free' in the meantime - so we must inform the listeners and add them to inactiveInstancesWithoutBacklog
+            addInactiveWithoutBacklog(newlyBacklogfreeNodeIds);
+        }
+        
+        logger.debug("checkView: as a result: inactive instances with backlog: {}, inactive instances without backlog: {}",
+                inactiveInstancesWithBacklog, inactiveInstancesWithoutBacklog);
+        
+        logger.trace("checkView: end, newView: {}", newView);
+        return newView!=null;
+    }
+
+    private void addInactiveWithoutBacklog(Set<Integer> nodeIds) {
+        if (nodeIds==null || nodeIds.size()==0) {
+            logger.trace("addInactiveWithoutBacklog: nodeIds is empty hence: returning immediately.");
+            return;
+        }
+        logger.trace("addInactiveWithoutBacklog: start. [nodeIds={}]", nodeIds);
+        
+        final InstanceStateChangeListener[] instanceStateChangeListeners;
+        synchronized(lock) {
+            instanceStateChangeListeners = Arrays.copyOf(this.instanceStateChangeListeners, this.instanceStateChangeListeners.length);
+        }
+
+        for (Iterator<Integer> it = nodeIds.iterator(); it.hasNext();) {
+            Integer aNodeId = it.next();
+            if (inactiveInstancesWithoutBacklog.contains(aNodeId)) {
+                //skipg
+                logger.trace("addInactiveWithoutBacklog: already known instance with backlog [hence ignoring]: {}", aNodeId);
+                continue;
+            }
+            if (inactiveInstancesWithBacklog.remove(aNodeId)) {
+                logger.debug("addInactiveWithoutBacklog: removed from inactiveInstancesWithBacklog: {}", aNodeId);
+            }
+            logger.info("addInactiveWithoutBacklog: new instance without backlog added: {}", aNodeId);
+            inactiveInstancesWithoutBacklog.add(aNodeId);
+            
+            sendInstanceStateChange(State.INACTIVE_NO_BACKLOG, instanceStateChangeListeners, aNodeId);
+        }
+        logger.trace("addInactiveWithoutBacklog: end");
+
+        
+    }
+
+    private void addInactiveWithBacklog(Set<Integer> nodeIds) {
+        if (nodeIds==null || nodeIds.size()==0) {
+            logger.trace("addInactiveWithBacklog: nodeIds is empty hence: returning immediately.");
+            return;
+        }
+        logger.trace("addInactiveWithBacklog: start. [nodeIds={}]", nodeIds);
+        
+        final InstanceStateChangeListener[] instanceStateChangeListeners;
+        synchronized(lock) {
+            instanceStateChangeListeners = Arrays.copyOf(this.instanceStateChangeListeners, this.instanceStateChangeListeners.length);
+        }
+
+        for (Iterator<Integer> it = nodeIds.iterator(); it.hasNext();) {
+            Integer aNodeId = it.next();
+            if (inactiveInstancesWithBacklog.contains(aNodeId)) {
+                //skipg
+                logger.trace("addInactiveWithBacklog: already known instance with backlog [hence ignoring]: {}", aNodeId);
+                continue;
+            }
+            if (inactiveInstancesWithoutBacklog.remove(aNodeId)) {
+                logger.debug("addInactiveWithBacklog: instance was listed as without backlog - removed now: {}", aNodeId);
+            }
+            logger.info("addInactiveWithBacklog: new instance with backlog added: {}", aNodeId);
+            inactiveInstancesWithBacklog.add(aNodeId);
+            
+            sendInstanceStateChange(State.INACTIVE_WITH_BACKLOG, instanceStateChangeListeners, aNodeId);
+        }
+        logger.trace("addInactiveWithBacklog: end");
+    }
+
+    private void sendInstancesStateChange(Set<Integer> oakRuntimeInstanceIds,
+            State newState) {
+        logger.trace("sendInstancesStateChange: oakRuntimeInstanceIds: {}, newState: {}", oakRuntimeInstanceIds, newState);
+        
+        final InstanceStateChangeListener[] instanceStateChangeListeners;
+        synchronized(lock) {
+            instanceStateChangeListeners = Arrays.copyOf(this.instanceStateChangeListeners, this.instanceStateChangeListeners.length);
+        }
+        
+        for (Iterator<Integer> it = oakRuntimeInstanceIds.iterator(); it
+                .hasNext();) {
+            final Integer oakRuntimeInstanceId = it.next();
+            
+            sendInstanceStateChange(newState, instanceStateChangeListeners,
+                    oakRuntimeInstanceId);
+        }
+        
+        logger.trace("sendInstancesStateChange: end");
+    }
+
+    private void sendInstanceStateChange(State newState,
+            final InstanceStateChangeListener[] instanceStateChangeListeners,
+            final Integer oakRuntimeInstanceId) {
+        logger.trace("sendInstanceStateChange: start [newState: {}, oakRuntimeInstanceId: {}, listeners: {}]", newState, oakRuntimeInstanceId, instanceStateChangeListeners);
+        // inform the InstanceStateChangeListeners
+        for (int i = 0; i < instanceStateChangeListeners.length; i++) {
+            InstanceStateChangeListener listener = instanceStateChangeListeners[i];
+            try{
+                logger.debug("sendInstanceStateChange: sending event about id {} to listener: {}. (newState: {})", listener, oakRuntimeInstanceId, newState);
+                listener.handleInstanceStateChange(String.valueOf(oakRuntimeInstanceId.intValue()), newState);
+                logger.debug("sendInstanceStateChange: sending event done");
+            } catch(RuntimeException re) {
+                logger.warn("sendEvent: listener threw RuntimeException: "+re, re);
+            }
+        }
+        logger.trace("sendInstanceStateChange: end");
+    }
+
+    private ClusterViewDocument doCheckView(Map<Integer,ClusterNodeInfoDocument> activeNodes) {
+        logger.trace("doCheckView: start");
+        
+        ClusterViewDocument newViewOrNull = readOrUpdateView(activeNodes.keySet());
+        if (newViewOrNull==null) {
+            // then there was a concurrent update of the clusterView
+            // and we should do some quick backoff sleeping
+            logger.debug("doCheckView: newViewOrNull is null: "+newViewOrNull);
+            return null;
+        } else {
+            logger.debug("doCheckView: newViewOrNull is not null: {}", newViewOrNull);
+            // otherwise we now hold the newly valid view
+            // it could be the same or different to the previous one, let's check
+            if (previousClusterView==null) {
+                logger.debug("doCheckView: previousClusterView is null");
+                // oh ok, this is the very first one
+                sendEvent(ClusterViewImpl.fromDocument(localOakRuntimeInstanceId, newViewOrNull));
+                previousClusterView = newViewOrNull;
+                return newViewOrNull;
+            } else if (previousClusterView.getViewId().equals(newViewOrNull.getViewId())) {
+                // that's the normal case: the viewId matches, nothing has changed, we've already
+                // processed the previousClusterView, so:
+                logger.debug("doCheckView: view id is still the same - no further check needed");
+                return newViewOrNull;
+            } else {
+                // otherwise the view has changed
+                logger.info("doCheckView: view has changed from: {} to: {} - sending event...", previousClusterView, newViewOrNull);
+                sendEvent(ClusterViewImpl.fromDocument(localOakRuntimeInstanceId, newViewOrNull));
+                previousClusterView = newViewOrNull;
+                logger.debug("doCheckView: view change event sent.");
+                return newViewOrNull;
+            }
+        }
+    }
+    
+    private ClusterViewDocument readOrUpdateView(Set<Integer> activeMemberIds) {
+        logger.trace("readOrUpdateView: start");
+        logger.debug("readOrUpdateView: activeMemberIds: "+activeMemberIds);
+        // check if the view matches what is stored in settings._id=>'clusterView'
+        final ClusterViewDocument currentClusterView = ClusterViewDocument.read(documentNodeStore);
+        
+        logger.debug("readOrUpdateView: got currentClusterView: "+currentClusterView);
+        if (currentClusterView!=null && currentClusterView.matches(activeMemberIds)) {
+            // that should be the very most frequent case: stable clusterView
+            logger.debug("readOrUpdateView: end [current clusterView matches]");
+            return currentClusterView;
+        }
+        logger.info("readOrUpdateView: existing view ({}) does not exist or match activeMemberIds ({})", currentClusterView, activeMemberIds);
+        
+        final Integer[] memberIds = activeMemberIds.toArray(new Integer[activeMemberIds.size()]);
+        
+        logger.debug("readOrUpdateView: going to create a new clusterView with memberIds: {}", (Object[])memberIds);
+        final ClusterViewDocument resultingClusterView = 
+                ClusterViewDocument.updateAndRead(documentNodeStore, currentClusterView==null, memberIds);
+        
+        if (resultingClusterView==null) {
+            // then we failed in creating or updating the view - let's retry in a bit
+            logger.info("readOrUpdateView: end [could not update clusterView - will be retried in a moment]");
+            return null;
+        }
+        
+        // then we have successfully changed the view
+        logger.info("readOrUpdateView: end [updated clusterView successfully to {}]", resultingClusterView);
+        return resultingClusterView;
+    }
+
+    /** sends a new view to the registered ClusterViewChangeListeners **/
+    private void sendEvent(ClusterViewImpl view) {
+        logger.trace("sendEvent: start. event: {}", view);
+        final ClusterViewChangeListener[] clusterViewChangeListeners;
+        synchronized(lock) {
+            clusterViewChangeListeners = Arrays.copyOf(this.clusterViewChangeListeners, this.clusterViewChangeListeners.length);
+        }
+        
+        // inform the ClusterViewChangeListeners
+        for (int i = 0; i < clusterViewChangeListeners.length; i++) {
+            ClusterViewChangeListener listener = clusterViewChangeListeners[i];
+            try{
+                logger.debug("sendEvent: sending event to listener: {}. (view: {})", listener, view);
+                listener.handleClusterViewChange(view);
+                logger.debug("sendEvent: sending event to listener done");
+            } catch(RuntimeException re) {
+                logger.warn("sendEvent: listener threw RuntimeException: "+re, re);
+            }
+        }
+        synchronized(lock) {
+            lastClusterViewSent = view;
+        }
+        logger.trace("sendEvent: end");
+    }
+
+    @Override
+    public void handleClusterStateChange() {
+        // handleClusterStateChange is needed to learn about any state change in the clusternodes 
+        // collection asap and being able to react on it - so this will wake up the
+        // backgroundWorker which in turn will - in a separate thread - check the view
+        // and send out events accordingly
+        wakeupBackgroundWorker(WakeupReason.CLUSTER_STATE_CHANGED);
+    }
+
+    @Override
+    public void handleBackgroundReadOperationDone() {
+        // handleBackgroundReadOperationDone is only used to react as quickly as possible
+        // when we have instances that have a 'backlog' - ie when instances crashed
+        // and are being recovered - then we must wait until the recovery is finished
+        // AND until the subsequent background read actually reads that instance'
+        // last changes. To catch that moment as quickly as possible,
+        // this handleBackgroundReadOperationDone is used.
+        // Now from the above it also results that this only wakes up the
+        // backgroundWorker if we have any pending 'backlogy instances'
+        // otherwise this is a no-op
+        wakeupBackgroundWorker(WakeupReason.BACKGROUND_READ_FINISHED);
+    }
+
+    private void wakeupBackgroundWorker(WakeupReason wakeupReason) {
+        final BackgroundWorker bw = backgroundWorker;
+        if (bw!=null) {
+            synchronized(bw) {
+                if (wakeupReason==WakeupReason.BACKGROUND_READ_FINISHED) {
+                    // then only forward the notify if the inactiveInstancesWithBacklog
+                    // has anything that we could be waiting for - otherwise
+                    // we dont need to wakeup the background thread
+                    if (inactiveInstancesWithBacklog.size()==0) {
+                        logger.trace("wakeupBackgroundWorker: not waking up backgroundWorker, as we have an empty inactiveInstancesWithBacklog");
+                        return;
+                    }
+                }
+                bw.notifyAll();
+            }
+        }
+    }
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
index 5acebda..c4d6b53 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
@@ -59,6 +59,15 @@ public class ClusterNodeInfo {
      * The end of the lease.
      */
     public static final String LEASE_END_KEY = "leaseEnd";
+    
+    /**
+     * The key for the root-revision of the last background write (of unsaved modifications) 
+     * - that is: the last root-revision written by the instance in case of a clear shutdown
+     * or via recovery of another instance in case of a crash
+     * <p>
+     * Is only set when state is NONE (upon setting ACTIVE this is flag is cleared)
+     */
+    public static final String LAST_WRITTEN_ROOT_REV_KEY = "lastWrittenRootRev";
 
     /**
      * The state of the cluster. On proper shutdown the state should be cleared.
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
index 969f437..c8c5c9f 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfoDocument.java
@@ -77,4 +77,8 @@ public class ClusterNodeInfoDocument extends Document {
     private RecoverLockState getRecoveryState(){
         return RecoverLockState.fromString((String) get(ClusterNodeInfo.REV_RECOVERY_LOCK));
     }
+
+    public String getLastBackgroundWrittenRootRev() {
+        return (String) get(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY);
+    }
 }
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java
new file mode 100644
index 0000000..244f255
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterViewDocument.java
@@ -0,0 +1,291 @@
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.commons.json.JsopBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusterViewDocument {
+    
+    private static final Logger logger = LoggerFactory.getLogger(ClusterViewDocument.class);
+
+    /** the id of this document is always 'clusterView' **/
+    private static final String CLUSTERVIEW_DOC_ID = "clusterView";
+    
+    // keys that we store in the root document - and in the history
+    private static final String CLUSTERVIEW_ID_KEY = "clusterViewId";
+    private static final String MEMBER_IDS_KEY = "memberIds";
+    private static final String CREATED_AT_KEY = "createdAt";
+    private static final String CREATED_BY_KEY = "createdBy";
+    private static final String RETIRED_AT_KEY = "retiredAt";
+    private static final String RETIRED_BY_KEY = "retiredBy";
+    private static final String CLUSTERVIEW_HISTORY_KEY = "clusterViewHistory";
+    
+    private static final DateFormat standardDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+    private static final int HISTORY_LIMIT = 5; 
+
+    public static ClusterViewDocument updateAndRead(DocumentNodeStore documentNodeStore,
+            boolean isNew, Integer[] memberIds) {
+        logger.trace("updateAndRead: updating clusterview: isNew: {}, memberIds: {}",
+                isNew, memberIds);
+        final int localClusterId = documentNodeStore.getClusterId();
+
+        final ClusterViewDocument previousView = read(documentNodeStore);
+        final UpdateOp updateOp = new UpdateOp(CLUSTERVIEW_DOC_ID, true);
+        final Date now = new Date();
+        updateOp.set(MEMBER_IDS_KEY, arrayToCsv(memberIds));
+        updateOp.set(CREATED_AT_KEY, standardDateFormat.format(now));
+        updateOp.set(CREATED_BY_KEY, localClusterId);
+        Map<Revision,String> historyMap = new HashMap<Revision,String>();
+        if (previousView!=null) {
+            Map<Revision, String> previousHistory = previousView.getHistory();
+            if (previousHistory!=null) {
+                historyMap.putAll(previousHistory);
+            }
+            
+            historyMap.put(Revision.newRevision(localClusterId), 
+                    asHistoryEntry(previousView, localClusterId, now));
+        }
+        applyLimit(historyMap);
+        updateOp.set(CLUSTERVIEW_HISTORY_KEY, historyMap);
+        
+        final Integer newViewId;
+        if (previousView==null) {
+            // we are the first ever, looks like, that the clusterview is defined
+            // so we can use viewId==1 and we make sure no other cluster node
+            // tries to create this first one simultaneously - so we use
+            // 'create'
+            
+            // going via 'create' requires ID to be set again (not only in new UpdateOp(id,isNew)):
+            updateOp.set(Document.ID, CLUSTERVIEW_DOC_ID);
+            final ArrayList<UpdateOp> updateOps = new ArrayList<UpdateOp>();
+            newViewId=1;
+            updateOp.setNew(true); // paranoia as that's already set above
+            updateOp.set(CLUSTERVIEW_ID_KEY, newViewId);
+            updateOps.add(updateOp);
+            logger.debug("updateAndRead: trying to create the first ever clusterView - hence id "+newViewId);
+            if (!documentNodeStore.getDocumentStore().create(Collection.SETTINGS, updateOps)) {
+                logger.debug("updateAndRead: someone else just created the first view ever while I tried - reread that one later");
+                return null;
+            }
+        } else {
+            // there were earlier clusterViews (the normal case) - thus we 
+            // use 'findAndUpdate' with the condition that 
+            // the view id is still at the previousview one
+            final Integer previousViewId = previousView.getViewId();
+            updateOp.setNew(false); // change to false from true above
+            updateOp.equals(CLUSTERVIEW_ID_KEY, null, previousViewId);
+            newViewId = previousViewId+1;
+            updateOp.set(CLUSTERVIEW_ID_KEY, newViewId);
+            logger.debug("updateAndRead: trying to update the clusterView to id "+newViewId);
+            if (documentNodeStore.getDocumentStore().findAndUpdate(Collection.SETTINGS, updateOp)==null) {
+                logger.debug("updateAndRead: someone else just updated the view which I wanted to do as well - reread that one later");
+                return null;
+            }
+        }
+        
+        // whatever the outcome of the above - we don't care -
+        // re-reading will in any case definitely show what has been persisted
+        // and if the re-read view contains the same id, it is what we have written
+        // - otherwise someone else came in between and we have to step back and retry
+        final ClusterViewDocument readResult = read(documentNodeStore);
+        if (readResult==null) {
+            logger.debug("updateAndRead: got null from read - whatever the exact reason, we must retry in a moment.");
+            return null;
+        } else if (newViewId.equals(readResult.getViewId())) {
+            logger.debug("updateAndRead: matching view - no change");
+            return readResult;
+        } else {
+            logger.debug("updateAndRead: someone else in the cluster was updating right after I also succeeded - re-read in a bit");
+            return null;
+        }
+    }
+
+    private static void applyLimit(Map<Revision, String> historyMap) {
+        while (historyMap.size()>HISTORY_LIMIT) {
+            // remove the oldest
+            Revision oldestRevision = null;
+            for (Iterator<Revision> it = historyMap.keySet().iterator(); it.hasNext();) {
+                Revision r = it.next();
+                if (oldestRevision==null) {
+                    oldestRevision = r;
+                } else if (Revision.getTimestampDifference(r, oldestRevision)<0) {
+                    oldestRevision = r;
+                }
+            }
+            if (oldestRevision==null) {
+                break;
+            } else {
+                historyMap.remove(oldestRevision);
+            }
+        }
+    }
+
+    private static String asHistoryEntry(final ClusterViewDocument previousView, int localClusterNodeId, Date now) {
+        String h;
+        JsopBuilder b = new JsopBuilder();
+        b.object();
+        b.key(CLUSTERVIEW_ID_KEY);
+        b.value(previousView.getViewId());
+        b.key(CREATED_AT_KEY);
+        b.value(String.valueOf(previousView.getCreatedAt()));
+        b.key(CREATED_BY_KEY);
+        b.value(previousView.getCreatedBy());
+        b.key(RETIRED_AT_KEY);
+        b.value(String.valueOf(standardDateFormat.format(now)));
+        b.key(RETIRED_BY_KEY);
+        b.value(localClusterNodeId);
+        b.key(MEMBER_IDS_KEY);
+        b.value(arrayToCsv(previousView.getMemberIds().toArray()));
+        b.endObject();
+        h = b.toString();
+        return h;
+    }
+    
+    private Map<Revision,String> getHistory() {
+        return viewHistory;
+    }
+
+    private static String arrayToCsv(Object[] arr) {
+        if (arr==null || arr.length==0) {
+            return null;
+        } else if (arr.length==1) {
+            return String.valueOf(arr[0]);
+        }
+        StringBuffer sb = new StringBuffer();
+        sb.append(String.valueOf(arr[0]));
+        for (int i = 1; i < arr.length; i++) {
+            final Object a = arr[i];
+            sb.append(",");
+            sb.append(String.valueOf(a));
+        }
+        return sb.toString();
+    }
+    
+    private static Integer[] csvToIntegerArray(String csv) {
+        if (csv==null) {
+            return null;
+        }
+        String[] split = csv.split(",");
+        Integer[] result = new Integer[split.length];
+        for (int i = 0; i < split.length; i++) {
+            result[i] = Integer.parseInt(split[i]);
+        }
+        return result;
+    }
+    
+    public static ClusterViewDocument read(DocumentNodeStore documentNodeStore) {
+        final DocumentStore documentStore = documentNodeStore.getDocumentStore();
+        final Document doc = documentStore.find(Collection.SETTINGS, "clusterView", -1 /* -1; avoid caching */);
+        if (doc==null) {
+            return null;
+        } else {
+            final ClusterViewDocument clusterView = new ClusterViewDocument(doc);
+            if (clusterView.isValid()) {
+                return clusterView;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    private final Integer viewId;
+
+    private final Integer[] memberIds;
+
+    private Map<Revision, String> viewHistory;
+
+    private String createdAt;
+
+    private Integer createdBy;
+
+    private ClusterViewDocument(Document doc) {
+        if (doc==null) {
+            throw new IllegalArgumentException("doc must not be null");
+        }
+        this.viewId = (Integer) doc.get(CLUSTERVIEW_ID_KEY);
+        this.createdAt = (String) doc.get(CREATED_AT_KEY);
+        this.createdBy = (Integer) doc.get(CREATED_BY_KEY);
+        
+        final Object obj = doc.get(MEMBER_IDS_KEY);
+        if (obj==null || !(obj instanceof String)) {
+            logger.trace("<init>: {} : {}", MEMBER_IDS_KEY, obj);
+            this.memberIds=null;
+        } else {
+            this.memberIds = csvToIntegerArray((String) obj);
+        }
+        
+        final Object obj2 = doc.get(CLUSTERVIEW_HISTORY_KEY);
+        if (obj2==null || !(obj2 instanceof Map)) {
+            logger.trace("<init> viewHistory is null");
+            this.viewHistory = null;
+        } else {
+            this.viewHistory = ((Map<Revision,String>)obj2);
+        }
+    }
+    
+    @Override
+    public String toString() {
+        String memberIdsToString;
+        if (memberIds==null) {
+            memberIdsToString = "null";
+        } else if (memberIds.length==1) {
+            memberIdsToString = String.valueOf(memberIds[0]);
+        } else {
+            StringBuffer sb = new StringBuffer(String.valueOf(memberIds[0]));
+            for (int i = 1; i < memberIds.length; i++) {
+                Integer anId = memberIds[i];
+                sb.append(",");
+                sb.append(anId);
+            }
+            memberIdsToString = sb.toString();
+        }
+        return "a ClusterView[valid="+isValid()+", viewId="+viewId+", memberIds="+memberIdsToString+"]";
+    }
+    
+    public boolean isValid() {
+        return viewId!=null && memberIds!=null && memberIds.length>0;
+    }
+    
+    public String getCreatedAt() {
+        return createdAt;
+    }
+    
+    public Integer getCreatedBy() {
+        return createdBy;
+    }
+    
+    public Integer getViewId() {
+        return viewId;
+    }
+    
+    public Set<Integer> getMemberIds() {
+        return new HashSet<Integer>(Arrays.asList(memberIds));
+    }
+
+    public boolean matches(Set<Integer> activeMemberIds) {
+        if (memberIds==null) {
+            return activeMemberIds==null || activeMemberIds.size()==0;
+        }
+        
+        for (int i = 0; i < memberIds.length; i++) {
+            Integer aMemberId = memberIds[i];
+            if (!activeMemberIds.contains(aMemberId)) {
+                return false;
+            }
+        }
+        return activeMemberIds.size()==memberIds.length;
+    }
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
index a0176af..3e19fea 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
@@ -76,6 +76,7 @@ import org.apache.jackrabbit.oak.commons.json.JsopReader;
 import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
 import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
+import org.apache.jackrabbit.oak.plugins.discoverylite.document.DiscoveryLiteListener;
 import org.apache.jackrabbit.oak.plugins.document.Checkpoints.Info;
 import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
 import org.apache.jackrabbit.oak.plugins.document.mongo.MongoBlobReferenceIterator;
@@ -323,6 +324,9 @@ public final class DocumentNodeStore
      * The blob store.
      */
     private final BlobStore blobStore;
+    
+    /** using volatile here as that's the easiest to get this visible to all threads as soon as set **/
+    private volatile DiscoveryLiteListener discoveryLiteListener;
 
     /**
      * The BlobSerializer.
@@ -1656,8 +1660,11 @@ public final class DocumentNodeStore
     /**
      * Updates the state about cluster nodes in {@link #activeClusterNodes}
      * and {@link #inactiveClusterNodes}.
+     * @return true if the cluster state has changed, false if the cluster state
+     * remained unchanged
      */
-    void updateClusterState() {
+    boolean updateClusterState() {
+        boolean hasChanged = false;
         long now = clock.getTime();
         Set<Integer> inactive = Sets.newHashSet();
         for (ClusterNodeInfoDocument doc : ClusterNodeInfoDocument.all(store)) {
@@ -1665,14 +1672,15 @@ public final class DocumentNodeStore
             if (cId != this.clusterId && !doc.isActive()) {
                 inactive.add(cId);
             } else {
-                activeClusterNodes.put(cId, doc.getLeaseEndTime());
+                hasChanged |= activeClusterNodes.put(cId, doc.getLeaseEndTime())==null;
             }
         }
-        activeClusterNodes.keySet().removeAll(inactive);
-        inactiveClusterNodes.keySet().retainAll(inactive);
+        hasChanged |= activeClusterNodes.keySet().removeAll(inactive);
+        hasChanged |= inactiveClusterNodes.keySet().retainAll(inactive);
         for (Integer clusterId : inactive) {
-            inactiveClusterNodes.putIfAbsent(clusterId, now);
+            hasChanged |= inactiveClusterNodes.putIfAbsent(clusterId, now)==null;
         }
+        return hasChanged;
     }
 
     /**
@@ -2261,7 +2269,16 @@ public final class DocumentNodeStore
         }
         return blobGC;
     }
+    
+    public void setDiscoveryLiteListener(DiscoveryLiteListener discoveryLiteListener) {
+        this.discoveryLiteListener = discoveryLiteListener;
+    }
 
+    void signalClusterStateChange() {
+        if (discoveryLiteListener!=null) {
+            discoveryLiteListener.handleClusterStateChange();
+        }
+    }
     //-----------------------------< DocumentNodeStoreMBean >---------------------------------
 
     public DocumentNodeStoreMBean getMBean() {
@@ -2416,6 +2433,9 @@ public final class DocumentNodeStore
         @Override
         protected void execute(@Nonnull DocumentNodeStore nodeStore) {
             nodeStore.runBackgroundReadOperations();
+            if (nodeStore.discoveryLiteListener!=null) {
+                nodeStore.discoveryLiteListener.handleBackgroundReadOperationDone();
+            }
         }
     }
 
@@ -2429,9 +2449,13 @@ public final class DocumentNodeStore
         @Override
         protected void execute(@Nonnull DocumentNodeStore nodeStore) {
             if (nodeStore.renewClusterIdLease()) {
-                nodeStore.updateClusterState();
+                if (nodeStore.updateClusterState()) {
+                    // then inform the discovery lite listener - if it is registered
+                    nodeStore.signalClusterStateChange();
+                }
             }
         }
+
     }
 
     public BlobStore getBlobStore() {
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
index defcd28..be66e3a 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
@@ -65,6 +65,7 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
+import org.apache.jackrabbit.oak.plugins.discoverylite.document.DocumentDiscoveryLiteService;
 import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
 import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
@@ -237,6 +238,10 @@ public class DocumentNodeStoreService {
     )
     private volatile DataSource blobDataSource;
 
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+            policy = ReferencePolicy.DYNAMIC)
+    private volatile DocumentDiscoveryLiteService documentDiscoveryLiteService;
+
     private DocumentMK mk;
     private ObserverTracker observerTracker;
     private ComponentContext context;
@@ -408,6 +413,9 @@ public class DocumentNodeStoreService {
             }
         }
 
+        if (documentDiscoveryLiteService!=null) {
+            documentDiscoveryLiteService.injectDocumentNodeStore(mk.getNodeStore());
+        }
         registerJMXBeans(mk.getNodeStore());
         registerLastRevRecoveryJob(mk.getNodeStore());
 
@@ -445,6 +453,15 @@ public class DocumentNodeStoreService {
     }
 
     @SuppressWarnings("UnusedDeclaration")
+    protected void bindDocumentDiscoveryLiteService(DocumentDiscoveryLiteService documentDiscoveryLiteService) throws IOException {
+        log.info("Initializing DocumentNodeStore with DocumentDiscoveryLiteService [{}]", documentDiscoveryLiteService);
+        this.documentDiscoveryLiteService = documentDiscoveryLiteService;
+        if (mk!=null && mk.getNodeStore()!=null) {
+            documentDiscoveryLiteService.injectDocumentNodeStore(mk.getNodeStore());
+        }
+    }
+
+    @SuppressWarnings("UnusedDeclaration")
     protected void bindBlobStore(BlobStore blobStore) throws IOException {
         log.info("Initializing DocumentNodeStore with BlobStore [{}]", blobStore);
         this.blobStore = blobStore;
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
index ea33a8f..4d3fb4e 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecoveryAgent.java
@@ -235,8 +235,16 @@ public class LastRevRecoveryAgent {
             return recover(suspects.iterator(), clusterId);
         } finally {
             Utils.closeIfCloseable(suspects);
+            
             // Relinquish the lock on the recovery for the cluster on the clusterInfo
+            //TODO: in case recover throws a RuntimeException (or Error..) then
+            //      the recovery might have failed, yet the instance is marked
+            //      as 'recovered' (by setting the state to NONE).
+            //      is this really fine here? or should we not retry - or at least
+            //      log the throwable?
             missingLastRevUtil.releaseRecoveryLock(clusterId);
+            
+            nodeStore.signalClusterStateChange();
         }
     }
 
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
index 1bcda54..03fdaa6 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/UnsavedModifications.java
@@ -210,6 +210,18 @@ class UnsavedModifications {
                 lastRev = null;
             }
         }
+        Revision writtenRootRev = pending.get("/");
+        if (writtenRootRev!=null) {
+            int cid = writtenRootRev.getClusterId();
+            if (store.getDocumentStore().find(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, String.valueOf(cid))!=null) {
+                UpdateOp update = new UpdateOp(String.valueOf(cid), false);
+                update.equals(Document.ID, null, cid);
+                update.set(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY, writtenRootRev.toString());
+                store.getDocumentStore().findAndUpdate(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, update);
+    //            store.getDocumentStore().createOrUpdate(org.apache.jackrabbit.oak.plugins.document.Collection.CLUSTER_NODES, update);
+            }
+        }
+        
         stats.write = clock.getTime() - time;
         return stats;
     }
