diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/ClusterView.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/ClusterView.java
new file mode 100644
index 0000000..3b98337
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/ClusterView.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.discovery;
+
+import java.util.Set;
+
+import aQute.bnd.annotation.ConsumerType;
+
+/**
+ * A ClusterView represents the state of the cluster at a particular 
+ * point in time.
+ * <p>
+ * A cluster here refers to all instances writing heartbeats to the same mongo
+ * collection.
+ * <p>
+ * A ClusterView carries a unique id that identifies this particular
+ * incarnation of ClusterView throughout the cluster - ie all instances
+ * in the cluster (which are represented in the clusterInstances set)
+ * refer to this ClusterView via the very same id - thus this id
+ * is valid and unique in this cluster.
+ * <p>
+ * A ClusterViewChangeListener can be used to get notifications when the state
+ * of the cluster changes - thus a new incarnation of ClusterEvent is sent.
+ */
+@ConsumerType
+public interface ClusterView {
+    
+    /**
+     * Each incarnation of the state of the cluster (represented by this
+     * ClusterView) carries a new, unique-but-clusterwide-known view id.
+     * @return the id that uniquely identifies this particular ClusterView
+     */
+    public String getId();
+    
+    /**
+     * The id that the local instance has been given - note that this
+     * id is not persisted but instead changes on each osgi activation
+     * of the containing service (which is MongoDiscoveryService)
+     * @return the (runtime, non-persisted) id of the local instance
+     */
+    public String getLocalInstanceId();
+    
+    /**
+     * Returns the list of instance ids that are currently active
+     * in this clusterView
+     * @return the list of instance ids that are currently active
+     * in this clusterView
+     */
+    public Set<String> getActiveClusterInstanceIds();
+    
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/ClusterViewChangeListener.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/ClusterViewChangeListener.java
new file mode 100644
index 0000000..efa3657
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/ClusterViewChangeListener.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.discovery;
+
+import aQute.bnd.annotation.ConsumerType;
+
+/**
+ * A ClusterViewChangeListener is informed about every change of the
+ * active ClusterView.
+ * <p>
+ * That is, when the underlying heartbeat mechanism detects that an instance
+ * joined or left the cluster, a new viewId is declared and stored in 
+ * mongo and subsequently announced to all instances in the cluster via
+ * this listener mechanism
+ */
+@ConsumerType
+public interface ClusterViewChangeListener {
+
+    /**
+     * Informs about a change of the active ClusterView
+     * @param newView the newly valid ClusterView
+     */
+    public void handleClusterViewChange(ClusterView newView);
+    
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/ClusterViewImpl.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/ClusterViewImpl.java
new file mode 100644
index 0000000..55ec51f
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/ClusterViewImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.discovery.impl;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.discovery.ClusterView;
+
+/** Default implementation of ClusterView that comes along with a nice toString() **/
+public class ClusterViewImpl implements ClusterView {
+
+    private final Set<String> activeClusterInstanceIds;
+    private final String localInstanceId;
+    private final String viewId;
+
+    ClusterViewImpl(Set<String> activeClusterInstanceIds, String localInstanceId, String viewId) {
+        this.activeClusterInstanceIds = Collections.unmodifiableSet(activeClusterInstanceIds);
+        this.localInstanceId = localInstanceId;
+        this.viewId = viewId;
+    }
+    
+    @Override
+    public String toString() {
+        return "ClusterEventImpl[viewId="+viewId+
+                ", localInstanceId="+localInstanceId+
+                ", activeClusterInstanceIds="+SetHelper.join(activeClusterInstanceIds, ",")+
+                "]";
+    }
+    
+    @Override
+    public String getLocalInstanceId() {
+        return localInstanceId;
+    }
+
+    @Override
+    public Set<String> getActiveClusterInstanceIds() {
+        return activeClusterInstanceIds;
+    }
+
+    @Override
+    public String getId() {
+        return viewId;
+    }
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/HeartbeatProcessor.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/HeartbeatProcessor.java
new file mode 100644
index 0000000..9e4b398
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/HeartbeatProcessor.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.discovery.impl;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.WriteResult;
+
+/**
+ * A HeartbeatProcessor is responsible for reading the current list of heartbeats
+ * stored in the provided mongo collection, verifies if they are timed out
+ * (based on the local clock - hence clocks must be in sync by an enough margin
+ * compared to the heartbeat timeout), and compares the result with the
+ * 'currentView' which is a (persisted) document that contains the latest
+ * valid ClusterView (ie contains viewId and list of active instanceIds).
+ * When the active instances does not match what's stored in 'clusterView',
+ * then the HeartbeatProcessor updates the 'clusterView' - being aware that
+ * this can happen concurrently by any other instance. This 'conflict' is
+ * detected by adding 'viewId' to the query for updating - and if no document
+ * could be updated, someone else already updated the view. In that case, the
+ * whole procedure of detecting who's alive etc is restarted and at that point,
+ * the view should match and no update should become necessary. Thus, as another
+ * rule, whenever the viewId changes, there was an actual change in the list
+ * of participating, active instanceIds.
+ */
+public class HeartbeatProcessor {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private final DBCollection mongoDbCollection;
+    
+    private final String localInstanceId;
+
+    private final long heartbeatTimeoutMillis;
+
+    private final int updateRetryCount;
+    
+    private final Random random = new Random();
+
+    /**
+     * Creats a new HeartbeatProcessor ready for checking the view
+     * with the following parameters:
+     * @param mongoDbCollection the mongo collection where the heartbeats and 'currentView' 
+     * are stored
+     * @param localInstanceId the id of the local instance (only used for adding
+     * debug into to the 'currentView', namely: 'createdBy')
+     * @param heartbeatTimeoutMillis the timeout in milliseconds after which
+     * this processor considers a heartbeat as timed out and it removes the
+     * id from the currentView
+     * @param updateRetryCount the number of times the processor should retry
+     * if it concurrently tries to update the 'currentView'. Note that this should
+     * not be set too high compared to the heartbeatTimeoutMillis - as retries
+     * are done as part of one heartbeat - and between retries there are 50-500 millisecond
+     * sleeps. Thus too many retries will result in the risk of the own, local,
+     * heartbeat to become expired!
+     */
+    HeartbeatProcessor(DBCollection mongoDbCollection, 
+            String localInstanceId, long heartbeatTimeoutMillis, int updateRetryCount) {
+        this.mongoDbCollection = mongoDbCollection;
+        this.localInstanceId = localInstanceId;
+        this.heartbeatTimeoutMillis = heartbeatTimeoutMillis;
+        this.updateRetryCount = updateRetryCount;
+    }
+    
+    /** 
+     * Checks the current heartbeats timeouts and if they match the currentView, 
+     * updates the currentView if not matched, retries on 'update conflicts' as instructed 
+     */
+    public ClusterViewImpl checkView() {
+        logger.trace("checkView: start");
+        for(int i=0; i<updateRetryCount; i++) {
+            logger.trace("checkView: iteration {} start", i);
+            try{
+                final ClusterViewImpl result = tryCheckView();
+                if (result!=null) {
+                    // success
+                    return result;
+                }
+                // otherwise: retry
+                final int randomSleepMillis = 50+random.nextInt(450);
+                logger.debug("checkView: reading view failed, backing off for {}ms.", randomSleepMillis);
+                Thread.sleep(randomSleepMillis);
+                logger.debug("checkView: retrying now.");
+                continue;
+            } catch(RuntimeException e) {
+                logger.error("checkView: RuntimeException occurred: "+e, e);
+                return null;
+            } catch (InterruptedException e) {
+                logger.info("checkView: got interrupted during backoff: "+e, e);
+            } finally {
+                logger.trace("checkView: iteration {} end", i);
+            }
+        }
+        logger.warn("checkView: could not read view after "+updateRetryCount+" tries. Giving up.");
+        logger.trace("checkView: end");
+        return null;
+    }
+
+    /**
+     * Tries once to check the current view (ie compare heartbeat timeouts vs the
+     * currentView's list of active instance ids), if it doesn't match, tries to
+     * update the currentView with what this instance believes to be valid. 
+     * If it fails to do so (which it detects by restricting the update to
+     * the previously known viewId, thus if the update returns with '0' updates
+     * it considers having detected an 'update conflict'), it just returns and
+     * lets the outer loop retry if feasible.
+     * @return the current view - or null in case of an 'update conflict'
+     */
+    private ClusterViewImpl tryCheckView() {
+        logger.trace("tryCheckView: start");
+        
+        // find all
+        final DBCursor cursor = mongoDbCollection.find();
+        final Set<String> activeInstanceIds = new HashSet<String>();
+        final long now = System.currentTimeMillis();
+        
+        Set<String> currentViewInstanceIds = null;
+        String currentViewId = null;
+        int currentViewModCnt = 0;
+
+        int numCnt = 0;
+        while(cursor.hasNext()) {
+            numCnt++;
+            final DBObject n = cursor.next();
+            final String id = (String) n.get("_id");
+            final Date lastHeartbeat = (Date) n.get("lastHeartbeat");
+            if (id.equals("currentView") && (lastHeartbeat==null)) {
+                // _id='currentView' without a lastHeartbeat corresponds to the
+                // actual group view that is stored in the fields:
+                // viewId: the id of this view (nothing to do with any discovery definition of a clusterId)
+                // instanceIds: comma separated list of instances that are part of this view
+                // modCnt: counter that might be useful for debugging to see how many updates are done to this document
+                //         (incremented on each update)
+                // createdAt: timestamp when this document was last updated, ie when this view was established
+                // createdBy: id of the instance that promoted the view
+
+                final String viewId = (String)n.get("viewId");
+                final String instanceIds = (String) n.get("instanceIds");
+                final Object modCnt = n.get("modCnt");
+
+                if (currentViewId!=null) {
+                    // simple check to detect duplicate currentView documents.
+                    // at the moment no further actions defined however - this
+                    // should never occur since upsert is used and there should
+                    // only be one document with '_id' => 'currentView'
+                    logger.warn("tryCheckView: multiple currentView documents found. Ignoring this one: viewId={}, modCnt, instanceIds={}", viewId, modCnt, instanceIds);
+                    continue;
+                }
+                
+                currentViewId = viewId;
+                if (modCnt!=null) {
+                    currentViewModCnt = (Integer) modCnt;
+                }
+                currentViewInstanceIds = new HashSet<String>(Arrays.asList(instanceIds.split(",")));
+                logger.debug("tryCheckView: currentViewId={}, modCnt={}, instanceIds ({})={}", currentViewId, modCnt, currentViewInstanceIds.size(), instanceIds);
+                continue;
+            }
+            if (id==null || lastHeartbeat==null) {
+                logger.debug("tryCheckView: ignore: "+n);
+                continue;
+            }
+            final long ageInMillis = now  - lastHeartbeat.getTime();
+            boolean isTimedOut = ageInMillis > heartbeatTimeoutMillis;
+            boolean veryOld = ageInMillis > 10*heartbeatTimeoutMillis; // if a heartbeat is 10x older than the timeout,
+                                                                       // we should be safe to remove it
+            logger.debug("tryCheckView: [{}] id={} has lastHeartbeat={}, isTimedOut={}, veryOld={}", numCnt, id, lastHeartbeat, isTimedOut, veryOld);
+            if (!isTimedOut) {
+                logger.debug("tryCheckView: active instance: {} ({}ms old)", id, ageInMillis);
+                activeInstanceIds.add(id);
+            } else if (veryOld) {
+                logger.debug("tryCheckView: very old inactive instance, deleting: {} ({}ms old)", id, ageInMillis);
+                final BasicDBObject query = new BasicDBObject("_id", id);
+                final WriteResult removeResult = mongoDbCollection.remove(query);
+                logger.debug("tryCheckView: result of removal: "+removeResult);
+            } else {
+                logger.debug("tryCheckView: inactive instance: {} ({}ms old)", id, ageInMillis);
+            }
+        }
+        
+        final boolean hasCurrentView = currentViewId!=null;
+        final boolean viewsAreEqual = currentViewInstanceIds==null ? false : currentViewInstanceIds.equals(activeInstanceIds);
+        logger.debug("tryCheckView: hasCurrentView="+hasCurrentView+", viewsAreEqual="+viewsAreEqual);
+        if (!hasCurrentView || !viewsAreEqual) {
+            // there is no currentViewId yet - try writing one now
+            logger.info("tryCheckView: trying to create a new currentView...");
+            
+            final BasicDBObject query = new BasicDBObject("_id", "currentView");
+            if (currentViewId!=null) {
+                // restrict to only succeed if no other discovery has updated the view in the meantime
+                query.append("viewId", currentViewId);
+            }
+            
+            final String newViewId = UUID.randomUUID().toString();
+            final String activeInstanceIdsAsString = SetHelper.join(activeInstanceIds, ",");
+
+            final DBObject update = new BasicDBObject("viewId", newViewId).
+                    append("instanceIds", activeInstanceIdsAsString).
+                    append("modCnt", (currentViewModCnt+1)).
+                    append("createdAt", new Date()).
+                    append("createdBy", localInstanceId);
+            
+            final boolean upsert = currentViewId==null;
+            final boolean multi = false;
+            final WriteResult writeResult = mongoDbCollection.update(query, update, upsert, multi);
+            logger.debug("tryCheckView: result of write: "+writeResult);
+            if (writeResult.getN()<=0) {
+                logger.info("tryCheckView: failed promoting new view: n was <=0: "+writeResult.getN()+" (writeResult="+writeResult+")");
+                return null;
+            }
+            logger.info("tryCheckView: successfully created new view: viewId: {}, localInstanceId: {}, activeInstanceIds: {}", newViewId, localInstanceId, activeInstanceIdsAsString);
+            currentViewId = newViewId;
+        }
+        
+        if (logger.isTraceEnabled()) {
+            logger.trace("tryCheckView: end (result: localInstanceId: {}, currentViewId: {}, activeInstanceIds: {}", 
+                    localInstanceId, currentViewId, SetHelper.join(activeInstanceIds, ","));
+        }
+        return new ClusterViewImpl(activeInstanceIds, localInstanceId, currentViewId);
+    }
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/HeartbeatWriter.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/HeartbeatWriter.java
new file mode 100644
index 0000000..0407547
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/HeartbeatWriter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.discovery.impl;
+
+import java.util.Date;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.WriteResult;
+
+/**
+ * Sole responsibility of this guy is to periodically and reliably write
+ * a heartbeat (which is a timestamp ie a Date) into a designated mongoDb 
+ * collection under a specific id (the localClusterId).
+ */
+public class HeartbeatWriter {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** the mongo collection that this writer should use to store its heartbeat to **/
+    private final DBCollection mongoDbCollection;
+
+    /** the id of the local instance - which is used in the heartbeat **/
+    private final String localInstanceId;
+
+    /**
+     * Creates a new HeartbeatWriter equipped with a mongoDbCollection and a localInstanceId.
+     * <p>
+     * Note: the localInstanceId is a runtime id and must not be persisted between
+     * runs - it solely identifies a currently running and active instance for the
+     * lifetime of the instance writing heartbeats to the provided mongoDbCollection.
+     * @param mongoDbCollection
+     * @param localInstanceId
+     */
+    HeartbeatWriter(DBCollection mongoDbCollection, String localInstanceId) {
+        if (mongoDbCollection==null) {
+            throw new IllegalArgumentException("mongoDbCollection must not be null");
+        }
+        if (localInstanceId==null || localInstanceId.length()==0) {
+            throw new IllegalArgumentException("localInstanceId must not be null or empty");
+        }
+        this.mongoDbCollection = mongoDbCollection;
+        this.localInstanceId = localInstanceId;
+    }
+
+    /** Removes the local heartbeat document - ensures other instances notice a leaving instance asap **/
+    private void removeHeartbeat() {
+        logger.trace("removeHeartbeat: start");
+        try{
+            final DBObject query = new BasicDBObject("_id", localInstanceId);
+            final WriteResult writeResult = mongoDbCollection.remove(query);
+            logger.debug("removeHeartbeat: result of remove: "+writeResult);
+        } catch(RuntimeException e) {
+            logger.error("removeHeartbeat: RuntimeException occurred: "+e, e);
+        }
+        logger.trace("removeHeartbeat: end");
+    }
+
+    /** Updates the local heartbeat document with the current timestamp **/
+    private void writeHeartbeat() {
+        logger.trace("writeHeartbeat: start");
+        try{
+            final DBObject query = new BasicDBObject("_id", localInstanceId);
+            final DBObject update = new BasicDBObject("lastHeartbeat", new Date());
+            final boolean upsert = true;
+            final boolean multi = false;
+            logger.debug("writeHeartbeat: writing heartbeat: localInstanceId: {}, update: {}", localInstanceId, update);
+            final WriteResult writeResult = mongoDbCollection.update(query, update, upsert, multi);
+            logger.debug("writeHeartbeat: result of write: "+writeResult);
+        } catch(RuntimeException e) {
+            logger.error("writeHeartbeat: RuntimeException occurred: "+e, e);
+        }
+        logger.trace("writeHeartbeat: end");
+    }
+
+    /** Triggers writing a heartbeat **/
+    void heartbeat() {
+        writeHeartbeat();
+    }
+
+    /** Triggers disposing of this writer **/
+    void dispose() {
+        removeHeartbeat();
+    }
+    
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/MongoDiscoveryService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/MongoDiscoveryService.java
new file mode 100644
index 0000000..4924317
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/MongoDiscoveryService.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.discovery.impl;
+
+import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.jackrabbit.oak.discovery.ClusterViewChangeListener;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.mongodb.DBCollection;
+
+/**
+ * The MongoDiscoveryService is used to write heartbeats to a particular
+ * mongo collection, thus informing all (other) instances that are 
+ * doing the same (ie writing/reading heartbeats to the same collection)
+ * that this instance is (still) alive. This assumes that:
+ * <ul>
+ *  <li>the machine clocks are reasonably in sync - that is, they shoul 
+ *  be off by magnitudes less than the heartbeat timeout</li>
+ *  <li>the background job responsible for writing heartbeats can do
+ *  so even under stress</li>
+ *  <li>at no time is this instance 'paused', which prevents writing heartbeats</li>
+ *  <li>the period in which it writes heartbeats is reasonably lower than
+ *  the heartbeat timeout</li>
+ * </ul>
+ * @author segli
+ *
+ */
+@Component(immediate = true)
+@Service(value = { MongoDiscoveryService.class })
+public class MongoDiscoveryService {
+
+    private static final String PREFIX = "oak.mongodiscovery.";
+
+    /** Configure the interval (in seconds) according to which the heartbeats are written */
+    public static final long DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 10;
+    @Property(longValue = DEFAULT_HEARTBEAT_INTERVAL_SECONDS,
+            label = "Discovery Heartbeat Interval",
+            description = "Interval in seconds between heartbeat updates."
+            )
+    private static final String PROP_HEARTBEAT_INTERVAL_SEC = "heartbeatInterval";
+    
+    /** Configure the timeout (in seconds) after which an instance is considered dead/crashed. */
+    public static final long DEFAULT_HEARTBEAT_TIMEOUT_SECONDS = 60;
+    @Property(longValue = DEFAULT_HEARTBEAT_TIMEOUT_SECONDS,
+            label = "Discovery Heartbeat Timeout",
+            description = "Timeout in seconds after which an instance's heartbeat is declared timed out. Must be reasonably larger than heartbeat interval (at least 4-5 times)"
+    )
+    private static final String PROP_HEARTBEAT_TIMEOUT_SEC = "heartbeatTimeout";
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private class BackgroundJob implements Runnable {
+
+        private boolean stopped;
+        private boolean hasStopped;
+        
+        @Override
+        public void run() {
+            try{
+                while(!isStopped()) {
+                    try{
+                        heartbeat();
+                        pause();
+                    } catch(Exception e) {
+                        logger.warn("BackgroundJob.run: got an Exception: "+e, e);
+                    } catch(Error er) {
+                        logger.error("BackgroundJob.run: got an Error, quitting! Error: "+er, er);
+                        // should not swallow Errors
+                        throw er;
+                    }
+                }
+            } finally {
+                logger.info("BackgroundJob.run: quits.");
+                hasStopped();
+            }
+        }
+        
+        private synchronized void pause() {
+            try {
+                this.wait(heartbeatIntervalMillis);
+            } catch (InterruptedException e) {
+                // this probably means we should stop
+            }
+        }
+
+        private synchronized void hasStopped() {
+            hasStopped = true;
+            this.notifyAll();
+        }
+
+        private synchronized boolean isStopped() {
+            return stopped;
+        }
+
+        synchronized void stop(boolean waitUntilHasStopped) {
+            stopped = true;
+            this.notifyAll();
+            if (waitUntilHasStopped) {
+                while(!hasStopped) {
+                    logger.debug("BackgroundJob.stop: waiting for run to stop...");
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        // we can just ignore this
+                    }
+                }
+                logger.debug("BackgroundJob.stop: run has stopped.");
+            }
+        }
+        
+    }
+
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, policy = ReferencePolicy.DYNAMIC, referenceInterface = ClusterViewChangeListener.class)
+    private ClusterViewChangeListener[] listeners = new ClusterViewChangeListener[0];
+    
+    /** the context provided via activate **/
+    private ComponentContext context;
+
+    /** object used for synchronizing actions on the eventListeners array **/
+    private final Object lock = new Object();
+
+    /** the last clusterview that was sent to listeners **/
+    private ClusterViewImpl lastClusterViewSent;
+    
+    /** the HeartbeatWriter used to write heartbeats **/
+    private HeartbeatWriter heartbeatWriter;
+
+    /** the HeartbeatProcessor used to check the current valid view and update if necessary **/
+    private HeartbeatProcessor heartbeatProcessor;
+    
+    /** the background job that periodically issues a heartbeat() **/
+    private BackgroundJob backgroundJob;
+
+    /** the runtime, non-persisted id of the local instance **/
+    private String localInstanceId;
+
+    /** the mongo collection that is injected to this service (explicitly to avoid abuse of this collection by others) **/
+    private DBCollection mongoDbCollection;
+    
+    /** the configured heartbeat interval in millis **/
+    private long heartbeatIntervalMillis;
+    
+    /** the configured heartbeat timeout in millis **/
+    private long heartbeatTimeoutMillis;
+
+    /** 
+     * Injects the given mongoDb to this MongoDiscoveryService for use for mongo heartbeats using its own collection.
+     * <p>
+     * The reason for going this unorthodox (read: non-osgi-like) path is to avoid having to expose
+     * the mongo db for any other services in the stack. This way, whoever injects the mongo DB here
+     * has full control and does this knowingly: it only injects it to the one MongoDiscoveryService
+     */
+    public void injectMongoDbCollection(DBCollection mongoDbCollection) {
+        logger.info("injectMongoDbCollection: got mongoDbCollection injected: {}", mongoDbCollection);
+        synchronized(lock) {
+            this.mongoDbCollection = mongoDbCollection;
+            init();
+        }
+    }
+
+    /** On activate the MongoDiscoveryService tries to start the background job */
+    protected void activate(ComponentContext context) {
+        logger.debug("activate: start");
+        this.context = context;
+
+        long heartbeatIntervalSeconds = toLong(prop(PROP_HEARTBEAT_INTERVAL_SEC), DEFAULT_HEARTBEAT_INTERVAL_SECONDS);
+        if (heartbeatIntervalSeconds<1) {
+            logger.info("activate: heartbeatInterval configured too low at {}millis, upping to 1sec", heartbeatIntervalSeconds);
+            heartbeatIntervalSeconds = 1;
+        }
+        heartbeatIntervalMillis = 1000 * heartbeatIntervalSeconds;
+        logger.info("activate: heartbeatIntervalMillis='{}'", this.heartbeatIntervalMillis);
+
+        long heartbeatTimeoutSeconds = toLong(prop(PROP_HEARTBEAT_TIMEOUT_SEC), DEFAULT_HEARTBEAT_TIMEOUT_SECONDS);
+        if (heartbeatTimeoutSeconds<3*heartbeatIntervalSeconds) {
+            logger.info("activate: heartbeatTimeout ({}) configured lower than 3x heartbeatInterval ({}), upping to {}", 
+                    heartbeatIntervalSeconds, heartbeatTimeoutSeconds, 3*heartbeatIntervalSeconds);
+            heartbeatTimeoutSeconds = 3*heartbeatIntervalSeconds;
+        }
+        heartbeatTimeoutMillis = 1000 * heartbeatTimeoutSeconds;
+        logger.info("activate: heartbeatTimeoutMillis='{}'", this.heartbeatTimeoutMillis);
+
+        synchronized(lock) {
+            localInstanceId = UUID.randomUUID().toString();
+            init();
+        }
+        logger.debug("activate: end");
+    }
+    
+    private Object prop(String propName) {
+        return prop(propName, PREFIX + propName);
+    }
+
+    private Object prop(String propName, String fwkPropName) {
+        //Prefer framework property first
+        Object value = context.getBundleContext().getProperty(fwkPropName);
+        if (value != null) {
+            return value;
+        }
+
+        //Fallback to one from config
+        return context.getProperties().get(propName);
+    }
+
+    /** tries to start the background job - if all relevant parts are set **/
+    private void init() {
+        logger.trace("init: start");
+        if (mongoDbCollection==null) {
+            logger.debug("init: cannot initialize yet, mongoDbCollection not yet injected");
+            return;
+        }
+        if (localInstanceId==null) {
+            logger.debug("init: cannot initialize yet, localInstanceid not yet set");
+            return;
+        }
+        
+        logger.info("init: starting heartbeat writer with id {}, interval {}ms and processor with timeout {}ms", localInstanceId, heartbeatIntervalMillis, heartbeatTimeoutMillis);
+        
+        heartbeatWriter = new HeartbeatWriter(mongoDbCollection, localInstanceId);
+        heartbeatProcessor = new HeartbeatProcessor(mongoDbCollection, localInstanceId, heartbeatTimeoutMillis, 5 /* currently hardcoded */);
+        startBackgroundJob();
+        logger.trace("init: end");
+    }
+    
+    /** On deactivate the background job is stopped - if it was running at all **/
+    protected void deactivate(ComponentContext context) {
+        logger.debug("deactivate: deactivated");
+        stopBackgroundJob();
+
+        synchronized(lock) {
+            backgroundJob = null;
+            if (heartbeatWriter!=null) {
+                logger.trace("deactivate: disposing heartbeatWriter");
+                heartbeatWriter.dispose();
+                heartbeatWriter = null;
+            } else {
+                logger.trace("deactivate: heartbeatWriter was not set");
+            }
+            heartbeatProcessor = null;
+        }
+        logger.debug("deactivate: end");
+    }
+    
+    /** Start the background job, which is responsible for periodically issuing a heartbeat() **/
+    private void startBackgroundJob() {
+        logger.debug("startBackgroundJob: start");
+        if (backgroundJob!=null) {
+            // if start is called twice, it stops first, then starts again
+            stopBackgroundJob();
+        }
+        backgroundJob = new BackgroundJob();
+        Thread th = new Thread(backgroundJob, "MongoDiscoveryService-BackgroundJob");
+        th.setDaemon(true);
+        th.start();
+        logger.debug("startBackgroundJob: end");
+    }
+    
+    /** Stops the background job if it was running - note that this waits for the background job to actually stop. **/
+    private void stopBackgroundJob() {
+        logger.debug("stopBackgroundJob: start");
+        if (backgroundJob!=null) {
+            logger.debug("stopBackgroundJob: stopping background job...");
+            backgroundJob.stop(true);
+            logger.debug("stopBackgroundJob: stopped background job");
+        }
+        logger.debug("stopBackgroundJob: end");
+    }
+
+    /**
+     * bind a cluster event listener
+     */
+    protected void bindClusterViewChangeListener(final ClusterViewChangeListener listener) {
+        logger.debug("bindClusterViewChangeListener: start. eventListener: {}", listener);
+        synchronized (lock) {
+            final List<ClusterViewChangeListener> currentList = new ArrayList<ClusterViewChangeListener>(
+                    Arrays.asList(listeners));
+            currentList.add(listener);
+            this.listeners = currentList
+                    .toArray(new ClusterViewChangeListener[currentList.size()]);
+        }
+        if (lastClusterViewSent!=null) {
+            try{
+                logger.trace("bindClusterViewChangeListener: sending event to listener: {} (view: {})", listener, lastClusterViewSent);
+                listener.handleClusterViewChange(lastClusterViewSent);
+                logger.trace("bindClusterViewChangeListener: sending event to listener done");
+            } catch(RuntimeException re) {
+                logger.warn("bindClusterViewChangeListener: listener threw RuntimeException: "+re, re);
+            }
+        }
+        logger.debug("bindClusterViewChangeListener: end");
+    }
+    
+    /**
+     * unbind a cluster event listener
+     */
+    protected void unbindClusterViewChangeListener(final ClusterViewChangeListener listener) {
+        logger.debug("unbindClusterViewChangeListener: start. eventListener: {}", listener);
+        synchronized (lock) {
+            final List<ClusterViewChangeListener> currentList = new ArrayList<ClusterViewChangeListener>(
+                    Arrays.asList(listeners));
+            currentList.remove(listener);
+            this.listeners = currentList
+                    .toArray(new ClusterViewChangeListener[currentList.size()]);
+        }
+        logger.debug("unbindClusterViewChangeListener: end");
+    }
+    
+    /** 
+     * Issue one heartbeat: this first writes its own heartbeat (updates the
+     * corresponding document with _id => localInstanceId) and then checks the
+     * view, if it still matches what it determines from all the heartbeats (checks the timeout).
+     * Should the view have changed, this method invokes sendEvent() to inform any
+     * registered ClusterViewChangeListeners)
+     */
+    private void heartbeat() {
+        logger.trace("heartbeat: start");
+        if (heartbeatWriter!=null) {
+            heartbeatWriter.heartbeat();
+        }
+        if (heartbeatProcessor!=null) {
+            final ClusterViewImpl newView = heartbeatProcessor.checkView();
+            logger.trace("heartbeat: current view is: {} (last known view is: {})", newView, lastClusterViewSent);
+            if (lastClusterViewSent==null || !lastClusterViewSent.getActiveClusterInstanceIds().equals(newView.getActiveClusterInstanceIds())
+                    || !lastClusterViewSent.getId().equals(newView.getId())) {
+                // if lastEvent is not yet set, this is the first event - thus we always report
+                // otherwise, we have to report a change if:
+                //   either the clusterInstances are not equal
+                //       OR the viewId is different
+                // Note on the 'viewId is different': the viewId is only changed
+                // if there is an actual change in the members - thus if the
+                // clusterInstances are equal, you could argue that the viewId does not matter.
+                // That however is not true: another instance could have noticed a coming and going
+                // instance, thus flipping between another view, while this instance did not
+                // take note of this. So from this instance' point of view, the change was not
+                // noticed. However, the other instance might have reported to listeners that
+                // an instance left/joined - immediately followed by the reversing action.
+                // thus upper layer should here be informed that the viewId has changed and 
+                // that they should take appropriate action (the 'other' instance that noticed
+                // the flipping should have been in TOPOLOGY_CHANGING and waiting for sync tokens
+                // which this instance would only send with the new viewId - so the other instance
+                // would eventually also take note of the new viewId and wait for this new one
+                // accordingly).
+                // so in any case: whenever the viewId changes, this must be reported to
+                // listeners (eventually the changing should stop and we should have a stable
+                // view - otherwise this algorithm would indeed fail)
+                logger.info("heartbeat: view changed from {} to {}", lastClusterViewSent, newView);
+                sendEvent(newView);
+                lastClusterViewSent = newView;
+            } else {
+                logger.trace("heartbeat: no change in view");
+            }
+        }
+        logger.trace("heartbeat: end");
+    }
+
+    /** sends a new view to the registered ClusterViewChangeListeners **/
+    private void sendEvent(ClusterViewImpl view) {
+        logger.trace("sendEvent: start. event: {}", view);
+        final ClusterViewChangeListener[] list;
+        synchronized(lock) {
+            list = Arrays.copyOf(listeners, listeners.length);
+        }
+        for (int i = 0; i < list.length; i++) {
+            ClusterViewChangeListener listener = list[i];
+            try{
+                logger.trace("sendEvent: sending event to listener: {}. (view: {})", listener, view);
+                listener.handleClusterViewChange(view);
+                logger.trace("sendEvent: sending event to listener done");
+            } catch(RuntimeException re) {
+                logger.warn("sendEvent: listener threw RuntimeException: "+re, re);
+            }
+        }
+        logger.trace("sendEvent: end");
+    }
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/SetHelper.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/SetHelper.java
new file mode 100644
index 0000000..e24e2c3
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/discovery/impl/SetHelper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.discovery.impl;
+
+import java.util.Iterator;
+import java.util.Set;
+
+/** Helper class for some Set operations **/
+public class SetHelper {
+
+    /** composes a String with all toString() of the provided set separated by the provided separator **/
+    static String join(Set<String> set, String separator) {
+        final StringBuffer sb = new StringBuffer();
+        for (Iterator<String> it = set.iterator(); it
+                .hasNext();) {
+            String anId = it.next();
+            if (sb.length()!=0) {
+                sb.append(",");
+            }
+            sb.append(anId);
+        }
+        return sb.toString();
+    }
+
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
index defcd28..7c92aee 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
@@ -41,9 +41,11 @@ import java.util.concurrent.TimeUnit;
 import javax.sql.DataSource;
 
 import com.mongodb.DB;
+import com.mongodb.DBCollection;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoClientURI;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.ConfigurationPolicy;
@@ -58,6 +60,7 @@ import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
+import org.apache.jackrabbit.oak.discovery.impl.MongoDiscoveryService;
 import org.apache.jackrabbit.oak.osgi.ObserverTracker;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
 import org.apache.jackrabbit.oak.plugins.blob.BlobGC;
@@ -237,6 +240,10 @@ public class DocumentNodeStoreService {
     )
     private volatile DataSource blobDataSource;
 
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+            policy = ReferencePolicy.DYNAMIC)
+    private volatile MongoDiscoveryService mongoDiscoveryService;
+
     private DocumentMK mk;
     private ObserverTracker observerTracker;
     private ComponentContext context;
@@ -387,6 +394,18 @@ public class DocumentNodeStoreService {
 
             MongoClient client = new MongoClient(mongoURI);
             DB mongoDB = client.getDB(db);
+            
+            if (mongoDiscoveryService!=null) {
+                log.info("registerNodeStore: mongoDiscoveryService is set - injecting mongoDb");
+                final String DISCOVERY_COLLECTION_NAME = "discovery";
+                DBCollection collection = mongoDB.getCollection(DISCOVERY_COLLECTION_NAME);
+                if (collection==null) {
+                    collection = mongoDB.createCollection(DISCOVERY_COLLECTION_NAME, null);
+                }
+                mongoDiscoveryService.injectMongoDbCollection(collection);
+            } else {
+                log.warn("registerNodeStore: mongoDiscoveryService not set, cannot inject");
+            }
 
             mkBuilder.setMaxReplicationLag(maxReplicationLagInSecs, TimeUnit.SECONDS);
             mkBuilder.setMongoDB(mongoDB, changesSize, blobCacheSize);
@@ -445,6 +464,13 @@ public class DocumentNodeStoreService {
     }
 
     @SuppressWarnings("UnusedDeclaration")
+    protected void bindMongoDiscoveryService(MongoDiscoveryService mongoDiscoveryService) throws IOException {
+        log.info("Initializing DocumentNodeStore with MongoDiscoveryService [{}]", mongoDiscoveryService);
+        this.mongoDiscoveryService = mongoDiscoveryService;
+        registerNodeStoreIfPossible();
+    }
+
+    @SuppressWarnings("UnusedDeclaration")
     protected void bindBlobStore(BlobStore blobStore) throws IOException {
         log.info("Initializing DocumentNodeStore with BlobStore [{}]", blobStore);
         this.blobStore = blobStore;
