diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
index 7e23241..d3d597f 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
@@ -29,7 +29,10 @@ import java.io.Closeable;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -91,6 +94,10 @@ public class BackgroundObserver implements Observer, Closeable {
     private final boolean alwaysCollapseExternalEvents =
             Boolean.parseBoolean(System.getProperty("oak.observation.alwaysCollapseExternal", "false"));
 
+    private final WaiterStateProvidingLatch closeLatch = new WaiterStateProvidingLatch(1);
+
+    private final int closeTimeoutSecs = Integer.getInteger("oak.observation.closeTimeout", 60);
+
     private static class ContentChange {
         private final NodeState root;
         private final CommitInfo info;
@@ -127,7 +134,11 @@ public class BackgroundObserver implements Observer, Closeable {
             public Void call() throws Exception {
                 try {
                     ContentChange change = queue.poll();
-                    if (change != null && change != STOP) {
+                    if (change == STOP){
+                        closeLatch.countDown();
+                        return null;
+                    }
+                    if (change != null) {
                         observer.contentChanged(change.root, change.info);
                         currentTask.onComplete(completionHandler);
                     }
@@ -209,6 +220,23 @@ public class BackgroundObserver implements Observer, Closeable {
         queue.clear();
         queue.add(STOP);
         stopped = true;
+
+        try {
+            if (!closeLatch.await(closeTimeoutSecs, TimeUnit.SECONDS)){
+                getLogger(observer).warn("Giving up waiting for close for {} after {}s", observer, closeTimeoutSecs);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Exposed for testing purposed to allow tests to
+     * assert that thread did wait upon close
+     * @return true if any thread is waiting in close call
+     */
+    boolean isClosing(){
+        return closeLatch.hasQueuedThread();
     }
 
     @Nonnull
@@ -302,4 +330,32 @@ public class BackgroundObserver implements Observer, Closeable {
     private static Logger getLogger(@Nonnull Observer observer) {
         return LoggerFactory.getLogger(checkNotNull(observer).getClass());
     }
+
+    /**
+     * Latch which also provides information around if any thread is
+     * waiting on it. This is similar to {@link Semaphore#hasQueuedThreads()}
+     */
+    private static class WaiterStateProvidingLatch extends CountDownLatch {
+        private volatile boolean waiting;
+
+        public WaiterStateProvidingLatch(int count) {
+            super(count);
+        }
+
+        @Override
+        public void await() throws InterruptedException {
+            waiting = true;
+            super.await();
+        }
+
+        @Override
+        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+            waiting = true;
+            return super.await(timeout, unit);
+        }
+
+        public boolean hasQueuedThread() {
+            return waiting;
+        }
+    }
 }
diff --git oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
index a889a03..096e4ae 100644
--- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
@@ -28,12 +28,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.junit.Test;
@@ -65,6 +67,42 @@ public class BackgroundObserverTest {
         }
     }
 
+    @Test
+    public void waitForClose() throws Exception{
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        final CountDownLatch continueLatch = new CountDownLatch(1);
+        final CountDownLatch receivedEventLatch = new CountDownLatch(1);
+        final BackgroundObserver bg = new BackgroundObserver(new Observer() {
+            @Override
+            public void contentChanged(@Nonnull NodeState root, CommitInfo info) {
+                if ("s1".equals(info.getSessionId())){
+                    receivedEventLatch.countDown();
+                    Uninterruptibles.awaitUninterruptibly(continueLatch);
+                }
+            }
+        }, executor, 10);
+
+        bg.contentChanged(EMPTY_NODE, new CommitInfo("s1", null));
+
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                bg.close();
+            }
+        });
+        t.start();
+
+        //Wait till close call enters wait state
+        while(!bg.isClosing());
+
+        //Let the thread complete
+        continueLatch.countDown();
+
+        //Wait for close to return
+        t.join();
+
+    }
+
     private static void contentChanged(Observer observer, long value) {
         NodeState node = EMPTY_NODE.builder().setProperty("p", value).getNodeState();
         observer.contentChanged(node, COMMIT_INFO);
