diff --git a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java index 046a98c..1a7231c 100644 --- a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java +++ b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java @@ -20,6 +20,8 @@ package org.apache.jackrabbit.oak.jcr.observation; import ch.qos.logback.classic.Level; import org.apache.jackrabbit.api.JackrabbitRepository; +import org.apache.jackrabbit.api.observation.JackrabbitEvent; +import org.apache.jackrabbit.oak.commons.PathUtils; import org.apache.jackrabbit.oak.commons.junit.LogCustomizer; import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest; @@ -219,18 +221,6 @@ public class ObservationQueueFullWarnTest extends AbstractRepositoryTest { return true; } - //Add another node only when num_pending_to_be_observed nodes is - //less that observation queue. This is done to let all observation finish - //up in case last few event were dropped due to full observation queue - //(which is ok as the next event that comes in gets diff-ed with last - //processed revision) - if (numAddedNodes.get() < numObservedNodes.get() + OBS_QUEUE_LENGTH) { - try { - addANode("addedWhileWaiting"); - } catch (RepositoryException e) { - LOG.warn("exception while adding during wait: {}", e); - } - } Thread.sleep(OBS_TIMEOUT_PER_ITEM/10);//The constant is exaggerated remaining = end - System.currentTimeMillis(); } @@ -260,62 +250,102 @@ public class ObservationQueueFullWarnTest extends AbstractRepositoryTest { blockObservation.release(); } } - + @Test public void testQueueFullThenFlushing() throws Exception { final Semaphore semaphore = new Semaphore(0); final AtomicLong counter = new AtomicLong(0); + final AtomicLong localCounter = new AtomicLong(0); EventListener listeners = new EventListener() { - + @Override public void onEvent(EventIterator events) { try { semaphore.acquire(); - } catch (InterruptedException e) { - throw new Error(e); - } finally { long numEvents = events.getSize(); counter.addAndGet(numEvents); System.out.println("GOT: "+numEvents + " - COUNTER: "+counter.get()); while(events.hasNext()) { Event e = events.nextEvent(); System.out.println(" - " + e); + if (PathUtils.getName(e.getPath()).startsWith("local")) { + if (e instanceof JackrabbitEvent && !((JackrabbitEvent)e).isExternal()) { + localCounter.incrementAndGet(); + } + } } + } catch (InterruptedException e) { + throw new Error(e); + } catch (RepositoryException e) { + throw new Error(e); } } }; - Node root = observingSession.getNode("/"); + Session session = getAdminSession(); + Node root = session.getNode("/"); root.addNode("testNode"); - observingSession.save(); + session.save(); observationManager.addEventListener(listeners, Event.PROPERTY_ADDED, "/", true, null, null, false); - + + int propCounter = 0; + // send out 6 events (or in general: queue length + 1): + // event #0 will get delivered but stalls at the listener (queue empty though) + // event #1-#5 will fill the queue - all must remain "local" + for(int i=0; iexternal", OBS_QUEUE_LENGTH+1, localCounter.get()); + + counter.set(0); + // send out 7 events (or in general: queue length + 2): // event #0 will get delivered but stalls at the listener (queue empty though) - // event #1-#6 will fill the queue - // event #7 will not fit in the queue anymore (queue full) - for(int i=0; i pass 100 - - // wait some time to allow the listener to get the events delivered - Thread.sleep(2000); - - System.out.println("COUNTER: "+counter.get()); - assertEquals(OBS_QUEUE_LENGTH + 2, counter.get()); - - root = observingSession.getNode("/"); - root.getNode("testNode").setProperty("p"+(OBS_QUEUE_LENGTH + 2), (OBS_QUEUE_LENGTH + 2)); - System.out.println("storing: /testNode/p"+(OBS_QUEUE_LENGTH + 2)); - observingSession.save(); - - Thread.sleep(1000); - assertEquals(OBS_QUEUE_LENGTH + 3, counter.get()); + + notTimedOut = waitFor(2000, new Condition() { + @Override + public boolean evaluate() { + return (OBS_QUEUE_LENGTH+2)==counter.get(); + } + }); + assertTrue("Listener didn't process " + (OBS_QUEUE_LENGTH+2) + " events within time-out", notTimedOut); + + root = session.getNode("/"); + root.getNode("testNode").setProperty("p" + propCounter, propCounter); + System.out.println("storing: /testNode/p" + propCounter); + session.save(); + + notTimedOut = waitFor(1000, new Condition() { + @Override + public boolean evaluate() { + return (OBS_QUEUE_LENGTH+3)==counter.get(); + } + }); + assertTrue("Listener didn't process " + (OBS_QUEUE_LENGTH+3) + " events within time-out", notTimedOut); } }