diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BackgroundReadStats.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BackgroundReadStats.java
new file mode 100644
index 0000000..b734284
--- /dev/null
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BackgroundReadStats.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
+
+class BackgroundReadStats {
+    CacheInvalidationStats cacheStats;
+    long readHead;
+    long cacheInvalidationTime;
+    long populateDiffCache;
+    long lock;
+    long dispatchChanges;
+    long totalReadTime;
+    public long numExternalChanges;
+
+    //TODO external diff size
+
+    @Override
+    public String toString() {
+        String cacheStatsMsg = "NOP";
+        if (cacheStats != null){
+            cacheStatsMsg = cacheStats.summaryReport();
+        }
+        return  "ReadStats{" +
+                "cacheStats:" + cacheStatsMsg +
+                ", head:" + readHead +
+                ", cache:" + cacheInvalidationTime +
+                ", diff: " + populateDiffCache +
+                ", lock:" + lock +
+                ", dispatch:" + dispatchChanges +
+                '}';
+    }
+}
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteStats.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteStats.java
index 70b69d8..a69bddf 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteStats.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/BackgroundWriteStats.java
@@ -26,6 +26,7 @@ class BackgroundWriteStats {
     long lock;
     long write;
     long num;
+    long totalWriteTime;
 
     @Override
     public String toString() {
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
index 475d4f4..4556d42 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentMK.java
@@ -528,6 +528,7 @@ public class DocumentMK {
         private BlobStoreStats blobStoreStats;
         private CacheStats blobStoreCacheStats;
         private DocumentStoreStatsCollector documentStoreStatsCollector;
+        private DocumentNodeStoreStatsCollector nodeStoreStatsCollector;
         private Map<CacheType, PersistentCacheStats> persistentCacheStats =
                 new EnumMap<CacheType, PersistentCacheStats>(CacheType.class);
 
@@ -911,6 +912,18 @@ public class DocumentMK {
             return this;
         }
 
+        public DocumentNodeStoreStatsCollector getNodeStoreStatsCollector() {
+            if (nodeStoreStatsCollector == null) {
+                nodeStoreStatsCollector = new DocumentNodeStoreStats(statisticsProvider);
+            }
+            return nodeStoreStatsCollector;
+        }
+
+        public Builder setNodeStoreStatsCollector(DocumentNodeStoreStatsCollector statsCollector) {
+            this.nodeStoreStatsCollector = statsCollector;
+            return this;
+        }
+
         @Nonnull
         public Map<CacheType, PersistentCacheStats> getPersistenceCacheStats() {
             return persistentCacheStats;
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
index 97974e2..a4bafb0 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
@@ -85,7 +85,6 @@ import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
 import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector;
 import org.apache.jackrabbit.oak.plugins.blob.ReferencedBlob;
 import org.apache.jackrabbit.oak.plugins.document.Branch.BranchCommit;
-import org.apache.jackrabbit.oak.plugins.document.cache.CacheInvalidationStats;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCache;
 import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
 import org.apache.jackrabbit.oak.plugins.document.util.ReadOnlyDocumentStoreWrapperFactory;
@@ -409,8 +408,11 @@ public final class DocumentNodeStore
 
     private final boolean readOnlyMode;
 
+    private final DocumentNodeStoreStatsCollector nodeStoreStatsCollector;
+
     public DocumentNodeStore(DocumentMK.Builder builder) {
         this.blobStore = builder.getBlobStore();
+        this.nodeStoreStatsCollector = builder.getNodeStoreStatsCollector();
         if (builder.isUseSimpleRevision()) {
             this.simpleRevisionCounter = new AtomicInteger(0);
         }
@@ -1718,6 +1720,7 @@ public final class DocumentNodeStore
     }
 
     private void internalRunBackgroundUpdateOperations() {
+        BackgroundWriteStats stats = null;
         synchronized (backgroundWriteMonitor) {
             long start = clock.getTime();
             long time = start;
@@ -1730,17 +1733,19 @@ public final class DocumentNodeStore
             backgroundSplit();
             long splitTime = clock.getTime() - time;
             // write back pending updates to _lastRev
-            BackgroundWriteStats stats = backgroundWrite();
+            stats = backgroundWrite();
             stats.split = splitTime;
             stats.clean = cleanTime;
+            stats.totalWriteTime = clock.getTime() - start;
             String msg = "Background operations stats ({})";
-            if (clock.getTime() - start > TimeUnit.SECONDS.toMillis(10)) {
+            if (stats.totalWriteTime > TimeUnit.SECONDS.toMillis(10)) {
                 // log as info if it took more than 10 seconds
                 LOG.info(msg, stats);
             } else {
                 LOG.debug(msg, stats);
             }
         }
+        nodeStoreStatsCollector.doneBackgroundUpdate(stats);
     }
 
     //----------------------< background read operations >----------------------
@@ -1763,19 +1768,21 @@ public final class DocumentNodeStore
 
     /** OAK-2624 : background read operations are split from background update ops */
     private void internalRunBackgroundReadOperations() {
+        BackgroundReadStats readStats = null;
         synchronized (backgroundReadMonitor) {
             long start = clock.getTime();
             // pull in changes from other cluster nodes
-            BackgroundReadStats readStats = backgroundRead();
-            long readTime = clock.getTime() - start;
+            readStats = backgroundRead();
+            readStats.totalReadTime = clock.getTime() - start;
             String msg = "Background read operations stats (read:{} {})";
             if (clock.getTime() - start > TimeUnit.SECONDS.toMillis(10)) {
                 // log as info if it took more than 10 seconds
-                LOG.info(msg, readTime, readStats);
+                LOG.info(msg, readStats.totalReadTime, readStats);
             } else {
-                LOG.debug(msg, readTime, readStats);
+                LOG.debug(msg, readStats.totalReadTime, readStats);
             }
         }
+        nodeStoreStatsCollector.doneBackgroundRead(readStats);
     }
 
     /**
@@ -1899,6 +1906,7 @@ public final class DocumentNodeStore
                 } else {
                     try {
                         externalSort.sort();
+                        stats.numExternalChanges = externalSort.getSize();
                         stats.cacheStats = store.invalidateCache(pathToId(externalSort));
                         // OAK-3002: only invalidate affected items (using journal)
                         long origSize = docChildrenCache.size();
@@ -1965,31 +1973,6 @@ public final class DocumentNodeStore
         return stats;
     }
 
-    private static class BackgroundReadStats {
-        CacheInvalidationStats cacheStats;
-        long readHead;
-        long cacheInvalidationTime;
-        long populateDiffCache;
-        long lock;
-        long dispatchChanges;
-
-        @Override
-        public String toString() {
-            String cacheStatsMsg = "NOP";
-            if (cacheStats != null){
-                cacheStatsMsg = cacheStats.summaryReport();
-            }
-            return  "ReadStats{" +
-                    "cacheStats:" + cacheStatsMsg +
-                    ", head:" + readHead +
-                    ", cache:" + cacheInvalidationTime +
-                    ", diff: " + populateDiffCache +
-                    ", lock:" + lock +
-                    ", dispatch:" + dispatchChanges +
-                    '}';
-        }
-    }
-
     private void cleanOrphanedBranches() {
         Branch b;
         while ((b = branches.pollOrphanedBranch()) != null) {
@@ -2778,4 +2761,8 @@ public final class DocumentNodeStore
     public String getInstanceId() {
         return String.valueOf(getClusterId());
     }
+
+    public DocumentNodeStoreStatsCollector getStatsCollector() {
+        return nodeStoreStatsCollector;
+    }
 }
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
index 6e99422..a963adb 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreBranch.java
@@ -155,6 +155,7 @@ class DocumentNodeStoreBranch implements NodeStoreBranch {
         Set<Revision> conflictRevisions = new HashSet<Revision>();
         long time = System.currentTimeMillis();
         int numRetries = 0;
+        boolean suspended= false;
         for (long backoff = MIN_BACKOFF; backoff <= maximumBackoff; backoff *= 2) {
             if (ex != null) {
                 try {
@@ -166,6 +167,7 @@ class DocumentNodeStoreBranch implements NodeStoreBranch {
                         // suspend until conflicting revision is visible
                         LOG.debug("Suspending until {} is visible. Current head {}.",
                                 conflictRevisions, store.getHeadRevision());
+                        suspended = true;
                         store.suspendUntilAll(conflictRevisions);
                         conflictRevisions.clear();
                         LOG.debug("Resumed. Current head {}.", store.getHeadRevision());
@@ -179,8 +181,10 @@ class DocumentNodeStoreBranch implements NodeStoreBranch {
                 }
             }
             try {
-                return branchState.merge(checkNotNull(hook),
+                NodeState result = branchState.merge(checkNotNull(hook),
                         checkNotNull(info), exclusive);
+                store.getStatsCollector().doneMerge(numRetries, System.currentTimeMillis() - time, suspended, exclusive);
+                return result;
             } catch (FailedWithConflictException e) {
                 ex = e;
                 conflictRevisions.addAll(e.getConflictRevisions());
@@ -198,6 +202,7 @@ class DocumentNodeStoreBranch implements NodeStoreBranch {
         }
         // if we get here retrying failed
         time = System.currentTimeMillis() - time;
+        store.getStatsCollector().failedMerge(numRetries, time, suspended, exclusive);
         String msg = ex.getMessage() + " (retries " + numRetries + ", " + time + " ms)";
         throw new CommitFailedException(ex.getSource(), ex.getType(),
                 ex.getCode(), msg, ex.getCause());
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreStats.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreStats.java
new file mode 100644
index 0000000..5d2df9d
--- /dev/null
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreStats.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.stats.HistogramStats;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+
+public class DocumentNodeStoreStats implements DocumentNodeStoreStatsCollector {
+    private static final String BGR_READ_HEAD = "DOCUMENT_NS_BGR_READ_HEAD";
+    private static final String BGR_CACHE_INVALIDATE = "DOCUMENT_NS_BGR_CACHE_INVALIDATE";
+    private static final String BGR_DIFF_CACHE = "DOCUMENT_NS_BGR_DIFF_CACHE";
+    private static final String BGR_LOCK = "DOCUMENT_NS_BGR_LOCK";
+    private static final String BGR_DISPATCH = "DOCUMENT_NS_BGR_DISPATCH";
+    private static final String BGR_TOTAL_TIME = "DOCUMENT_NS_BGR_TOTAL_TIME";
+    private static final String BGR_NUM_CHANGES_RATE = "DOCUMENT_NS_BGR_NUM_CHANGES_RATE";
+    private static final String BGR_NUM_CHANGES_HISTO = "DOCUMENT_NS_BGR_NUM_CHANGES_HISTO";
+
+    private static final String BGW_CLEAN = "DOCUMENT_NS_BGW_CLEAN";
+    private static final String BGW_SPLIT = "DOCUMENT_NS_BGW_SPLIT";
+    private static final String BGW_WRITE = "DOCUMENT_NS_BGW_LOCK";
+    private static final String BGW_NUM = "DOCUMENT_NS_BGW_NUM";
+    private static final String BGW_TOTAL = "DOCUMENT_NS_BGW_TOTAL_TIME";
+
+    private static final String MERGE_SUCCESS_NUM_RETRY = "DOCUMENT_NS_MERGE_SUCCESS_RETRY";
+    private static final String MERGE_SUCCESS_COUNT = "DOCUMENT_NS_MERGE_SUCCESS_COUNT";
+    private static final String MERGE_SUCCESS_TIME = "DOCUMENT_NS_MERGE_SUCCESS_TIME";
+    private static final String MERGE_SUCCESS_SUSPENDED = "DOCUMENT_NS_MERGE_SUCCESS_SUSPENDED";
+    private static final String MERGE_SUCCESS_EXCLUSIVE = "DOCUMENT_NS_MERGE_SUCCESS_EXCLUSIVE";
+
+    private static final String MERGE_FAILED_EXCLUSIVE = "DOCUMENT_NS_MERGE_FAILED_EXCLUSIVE";
+
+    private final TimerStats readHead;
+    private final TimerStats readCacheInvalidate;
+    private final TimerStats readDiffCache;
+    private final TimerStats readLock;
+    private final TimerStats readDispatch;
+    private final TimerStats readTotalTime;
+    private final MeterStats numChangesRate;
+    private final HistogramStats numChangesHisto;
+
+    private final TimerStats writeClean;
+    private final TimerStats writeSplit;
+    private final HistogramStats writeNum;
+    private final TimerStats writeWrite;
+    private final TimerStats writeTotal;
+
+    private final HistogramStats mergeSuccessRetries;
+    private final MeterStats mergeSuccessRate;
+    private final TimerStats mergeSuccessTime;
+    private final MeterStats mergeSuccessExclusive;
+    private final MeterStats mergeSuccessSuspended;
+
+    private final MeterStats mergeFailedExclusive;
+
+
+    public DocumentNodeStoreStats(StatisticsProvider sp) {
+        readHead = sp.getTimer(BGR_READ_HEAD, StatsOptions.METRICS_ONLY);
+        readCacheInvalidate = sp.getTimer(BGR_CACHE_INVALIDATE, StatsOptions.METRICS_ONLY);
+        readDiffCache = sp.getTimer(BGR_DIFF_CACHE, StatsOptions.METRICS_ONLY);
+        readLock = sp.getTimer(BGR_LOCK, StatsOptions.METRICS_ONLY);
+        readDispatch = sp.getTimer(BGR_DISPATCH, StatsOptions.METRICS_ONLY);
+        readTotalTime = sp.getTimer(BGR_TOTAL_TIME, StatsOptions.METRICS_ONLY);
+        numChangesRate = sp.getMeter(BGR_NUM_CHANGES_RATE, StatsOptions.DEFAULT); //Enable time series
+        numChangesHisto = sp.getHistogram(BGR_NUM_CHANGES_HISTO, StatsOptions.METRICS_ONLY);
+
+        writeClean = sp.getTimer(BGW_CLEAN, StatsOptions.METRICS_ONLY);
+        writeSplit = sp.getTimer(BGW_SPLIT, StatsOptions.METRICS_ONLY);
+        writeWrite = sp.getTimer(BGW_WRITE, StatsOptions.METRICS_ONLY);
+        writeTotal = sp.getTimer(BGW_TOTAL, StatsOptions.METRICS_ONLY);
+        writeNum = sp.getHistogram(BGW_NUM, StatsOptions.DEFAULT);
+
+        mergeSuccessRetries = sp.getHistogram(MERGE_SUCCESS_NUM_RETRY, StatsOptions.METRICS_ONLY);
+        mergeSuccessRate = sp.getMeter(MERGE_SUCCESS_COUNT, StatsOptions.DEFAULT); //Enable time series
+        mergeSuccessTime = sp.getTimer(MERGE_SUCCESS_TIME, StatsOptions.METRICS_ONLY);
+        mergeSuccessExclusive = sp.getMeter(MERGE_SUCCESS_EXCLUSIVE, StatsOptions.METRICS_ONLY);
+        mergeSuccessSuspended = sp.getMeter(MERGE_SUCCESS_SUSPENDED, StatsOptions.METRICS_ONLY);
+
+        mergeFailedExclusive = sp.getMeter(MERGE_FAILED_EXCLUSIVE, StatsOptions.DEFAULT); //Enable time series
+    }
+
+    @Override
+    public void doneBackgroundRead(BackgroundReadStats stats) {
+        readHead.update(stats.readHead, TimeUnit.MILLISECONDS);
+        readCacheInvalidate.update(stats.cacheInvalidationTime, TimeUnit.MILLISECONDS);
+        readDiffCache.update(stats.populateDiffCache, TimeUnit.MILLISECONDS);
+        readLock.update(stats.lock, TimeUnit.MILLISECONDS);
+        readDispatch.update(stats.dispatchChanges, TimeUnit.MILLISECONDS);
+        readTotalTime.update(stats.totalReadTime, TimeUnit.MILLISECONDS);
+
+        numChangesRate.mark(stats.numExternalChanges);
+        numChangesHisto.update(stats.numExternalChanges);
+    }
+
+    @Override
+    public void doneBackgroundUpdate(BackgroundWriteStats stats) {
+        writeClean.update(stats.clean, TimeUnit.MILLISECONDS);
+        writeSplit.update(stats.split, TimeUnit.MILLISECONDS);
+        writeWrite.update(stats.write, TimeUnit.MILLISECONDS);
+        writeTotal.update(stats.totalWriteTime, TimeUnit.MILLISECONDS);
+
+        writeNum.update(stats.num);
+    }
+
+    @Override
+    public void doneMerge(int numRetries, long timeTaken, boolean suspended, boolean exclusive) {
+        mergeSuccessRate.mark();
+        mergeSuccessRetries.update(numRetries);
+        mergeSuccessTime.update(timeTaken, TimeUnit.MILLISECONDS);
+
+        if (exclusive) {
+            mergeSuccessExclusive.mark();
+        }
+
+        if (suspended) {
+            mergeSuccessSuspended.mark();;
+        }
+    }
+
+    @Override
+    public void failedMerge(int numRetries, long time, boolean suspended, boolean exclusive) {
+        if (exclusive){
+            mergeFailedExclusive.mark();
+        }
+    }
+}
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreStatsCollector.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreStatsCollector.java
new file mode 100644
index 0000000..fdf7cf9
--- /dev/null
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreStatsCollector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+public interface DocumentNodeStoreStatsCollector {
+    void doneBackgroundRead(BackgroundReadStats stats);
+
+    void doneBackgroundUpdate(BackgroundWriteStats stats);
+
+    void doneMerge(int numRetries, long timeTaken, boolean suspended, boolean exclusive);
+
+    void failedMerge(int numRetries, long time, boolean suspended, boolean exclusive);
+}
