Index: oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java
===================================================================
--- oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java	(revision 1783866)
+++ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java	(working copy)
@@ -48,8 +48,10 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static javax.jcr.observation.Event.NODE_ADDED;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(Parameterized.class)
@@ -258,4 +260,62 @@
             blockObservation.release();
         }
     }
+    
+    @Test
+    public void testQueueFullThenFlushing() throws Exception {
+        final Semaphore semaphore = new Semaphore(0);
+        final AtomicLong counter = new AtomicLong(0);
+        EventListener listeners = new EventListener() {
+            
+            @Override
+            public void onEvent(EventIterator events) {
+                try {
+                    semaphore.acquire();
+                } catch (InterruptedException e) {
+                    throw new Error(e);
+                } finally {
+                    long numEvents = events.getSize();
+                    counter.addAndGet(numEvents);
+                    System.out.println("GOT: "+numEvents + " - COUNTER: "+counter.get());
+                    while(events.hasNext()) {
+                        Event e = events.nextEvent();
+                        System.out.println(" - " + e);
+                    }
+                }
+            }
+        };
+        Node root = observingSession.getNode("/");
+        root.addNode("testNode");
+        observingSession.save();
+
+        observationManager.addEventListener(listeners, Event.PROPERTY_ADDED, "/", true, null, null, false);
+        
+        // send out 7 events (or in general: queue length + 2):
+        // event #0 will get delivered but stalls at the listener (queue empty though)
+        // event #1-#6 will fill the queue
+        // event #7 will not fit in the queue anymore (queue full)
+        for(int i=0; i<OBS_QUEUE_LENGTH + 2; i++) {
+            root = observingSession.getNode("/");
+            root.getNode("testNode").setProperty("p"+i, i);
+            System.out.println("storing: /testNode/p"+i);
+            observingSession.save();
+        }
+        
+        // release the listener
+        semaphore.release(100); // ensure acquire will no longer block during this test -> pass 100
+        
+        // wait some time to allow the listener to get the events delivered
+        Thread.sleep(2000);
+        
+        System.out.println("COUNTER: "+counter.get());
+        assertEquals(OBS_QUEUE_LENGTH + 2, counter.get());
+        
+        root = observingSession.getNode("/");
+        root.getNode("testNode").setProperty("p"+(OBS_QUEUE_LENGTH + 2), (OBS_QUEUE_LENGTH + 2));
+        System.out.println("storing: /testNode/p"+(OBS_QUEUE_LENGTH + 2));
+        observingSession.save();
+        
+        Thread.sleep(1000);
+        assertEquals(OBS_QUEUE_LENGTH + 3, counter.get());
+    }
 }

