Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (revision 1784018) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (working copy) @@ -110,12 +110,6 @@ private ContentChange last; /** - * Flag to indicate that some content changes were dropped because - * the queue was full. - */ - private boolean full; - - /** * Current background task */ private volatile NotifyingFutureTask currentTask = NotifyingFutureTask.completed(); @@ -283,28 +277,30 @@ //TODO - Support for merging ChangeSet for external changes queue.remove(last); - full = false; } - ContentChange change; - if (full) { - // If the queue is full, some commits have already been skipped - // so we need to drop the possible local commit information as - // only external changes can be merged together to larger chunks. - change = new ContentChange(root, CommitInfo.EMPTY_EXTERNAL); - } else { - change = new ContentChange(root, info); - } + ContentChange change = new ContentChange(root, info); - // Try to add this change to the queue without blocking, and - // mark the queue as full if there wasn't enough space - full = !queue.offer(change); + // Try to add this change to the queue without blocking + boolean full = !queue.offer(change); - if (!full) { - // Keep track of the last change added, so we can do the - // compacting of external changes shown above. - last = change; + if (full && last != null) { // last is only null at the beginning + // queue is full. + + // when the change can't be added to the queue because it's full + // remove the last entry and add an explicit overflow entry instead. + queue.remove(last); + + // by removing the last entry we have to drop the possible + // local commit information of the current change, + // as we're doing collapsing here and the commit information + // no longer represents an individual commit + change = new ContentChange(root, CommitInfo.EMPTY_EXTERNAL); + queue.offer(change); } + // Keep track of the last change added, so we can do the + // compacting of external changes shown above. + last = change; // Set the completion handler on the currently running task. Multiple calls // to onComplete are not a problem here since we always pass the same value. Index: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java (revision 1784018) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java (working copy) @@ -338,7 +338,7 @@ // this one will be queued as #2 NodeState thirdIncluded = generator.next(); - expected.add(new Pair(secondIncluded, thirdIncluded)); +// expected.add(new Pair(secondIncluded, thirdIncluded)); fo.contentChanged(thirdIncluded, CommitInfo.EMPTY); // this one will cause the queue to 'overflow' (full==true) @@ -358,6 +358,10 @@ next = generator.next(); // excluded==false BUT queue full, hence not adding to expected fo.contentChanged(next, CommitInfo.EMPTY); + + // with OAK-5740 the overflow entry now looks as follows: + expected.add(new Pair(secondIncluded, next)); + // let recorder continue recorder.unpause(); @@ -376,10 +380,10 @@ // only happens with non-filtered items, so adding yet another one now filter.excludeNext(false); NodeState last = generator.next(); - // while above the "seventhAfterQueueFull" DOES get filtered, the next contentChange - // triggers the release of the 'queue full overflow element' (with commitInfo==null) - // and that we must add as expected() - expected.add(new Pair(thirdIncluded, seventhAfterQueueFull)); // commitInfo == null + // the 'seventhAfterQueueFull' DOES get filtered - and as per behavior + // pre-OAK-5740 it used to get flushed with the next contentChanged, + // however, with OAK-5740 this is no longer the case as we now + // use the last queue entry as the overflow entry expected.add(new Pair(seventhAfterQueueFull, last)); fo.contentChanged(last, CommitInfo.EMPTY); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java (revision 1784018) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java (working copy) @@ -246,10 +246,11 @@ new TestPattern(INCLUDED, 5, false, 0, 0, 0, 6), // here: 1 init and 5 changes are in the queue, the queue fits 7, so queue is almost full new TestPattern(EXCLUDED, 500, false, 0, 0, 6, 6), - // still 6 in the queue, of 7 + // still 6 in the queue, of 7 + // due to OAK-5740 the last entry is now an include new TestPattern(INCLUDED, 5, false, 0, 0, 6, 7), - // now we added 2 (one NOOP and one of those 5), so the queue got full (==7) - new TestPattern(EXCLUDED, 0 /* only flush*/, true, 5, 0, 7, 0) + // so with OAK-5740 we now will get 6 includes, not 5 + new TestPattern(EXCLUDED, 0 /* only flush*/, true, 6, 0, 7, 0) ); } Index: oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java =================================================================== --- oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java (revision 1784018) +++ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java (working copy) @@ -1,261 +1,321 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.jackrabbit.oak.jcr.observation; - -import ch.qos.logback.classic.Level; -import org.apache.jackrabbit.api.JackrabbitRepository; -import org.apache.jackrabbit.oak.commons.junit.LogCustomizer; -import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; -import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest; -import org.apache.jackrabbit.oak.jcr.Jcr; -import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl; -import org.apache.jackrabbit.oak.stats.Clock; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jcr.Node; -import javax.jcr.RepositoryException; -import javax.jcr.Session; -import javax.jcr.SimpleCredentials; -import javax.jcr.observation.Event; -import javax.jcr.observation.EventIterator; -import javax.jcr.observation.EventListener; -import javax.jcr.observation.ObservationManager; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import static javax.jcr.observation.Event.NODE_ADDED; -import static org.junit.Assert.assertTrue; - -@RunWith(Parameterized.class) -public class ObservationQueueFullWarnTest extends AbstractRepositoryTest { - private static final int OBS_QUEUE_LENGTH = 5; - private static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted."; - - private static final String TEST_NODE = "test_node"; - private static final String TEST_NODE_TYPE = "oak:Unstructured"; - private static final String TEST_PATH = '/' + TEST_NODE; - - private static final long OBS_TIMEOUT_PER_ITEM = 1000; - private static final long CONDITION_TIMEOUT = OBS_QUEUE_LENGTH * OBS_TIMEOUT_PER_ITEM; - - private Session observingSession; - private ObservationManager observationManager; - - private final BlockableListener listener = new BlockableListener(); - - private static final Logger LOG = LoggerFactory.getLogger(ObservationQueueFullWarnTest.class); - - private final Semaphore blockObservation = new Semaphore(1); - - private final AtomicInteger numAddedNodes = new AtomicInteger(0); - private final AtomicInteger numObservedNodes = new AtomicInteger(0); - - public ObservationQueueFullWarnTest(NodeStoreFixture fixture) { - super(fixture); - LOG.info("fixture: {}", fixture); - } - - @Override - protected Jcr initJcr(Jcr jcr) { - return jcr.withObservationQueueLength(OBS_QUEUE_LENGTH); - } - - @Before - public void setup() throws RepositoryException { - Session session = getAdminSession(); - - session.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE); - session.save(); - - Map attrs = new HashMap<>(); - attrs.put(RepositoryImpl.REFRESH_INTERVAL, 0); - observingSession = ((JackrabbitRepository) getRepository()).login(new SimpleCredentials("admin", "admin".toCharArray()), null, attrs); - observationManager = observingSession.getWorkspace().getObservationManager(); - } - - @After - public void tearDown() { - observingSession.logout(); - } - - @Test - public void warnOnQueueFull() throws RepositoryException, InterruptedException, ExecutionException { - LogCustomizer customLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName()) - .filter(Level.WARN) - .contains(OBS_QUEUE_FULL_WARN) - .create(); - - observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false); - try { - customLogs.starting(); - addNodeToFillObsQueue(); - assertTrue("Observation queue full warning must get logged", customLogs.getLogs().size() > 0); - customLogs.finished(); - } finally { - observationManager.removeEventListener(listener); - } - } - - @Test - public void warnOnRepeatedQueueFull() throws RepositoryException, InterruptedException, ExecutionException { - LogCustomizer warnLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName()) - .filter(Level.WARN) - .contains(OBS_QUEUE_FULL_WARN) - .create(); - LogCustomizer debugLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName()) - .filter(Level.DEBUG) - .contains(OBS_QUEUE_FULL_WARN) - .create(); - LogCustomizer logLevelSetting = LogCustomizer.forLogger(ChangeProcessor.class.getName()) - .enable(Level.DEBUG) - .create(); - logLevelSetting.starting(); - - long oldWarnLogInterval = ChangeProcessor.QUEUE_FULL_WARN_INTERVAL; - //Assumption is that 10 (virtual) minutes won't pass by the time we move from one stage of queue fill to next. - ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(10); - - Clock oldClockInstance = ChangeProcessor.clock; - Clock virtualClock = new Clock.Virtual(); - ChangeProcessor.clock = virtualClock; - virtualClock.waitUntil(System.currentTimeMillis()); - - observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false); - try { - //Create first level WARN message - addNodeToFillObsQueue(); - emptyObsQueue(); - - //Don't wait, fill up the queue again - warnLogs.starting(); - debugLogs.starting(); - addNodeToFillObsQueue(); - assertTrue("Observation queue full warning must not logged until some time has past since last log", - warnLogs.getLogs().size() == 0); - assertTrue("Observation queue full warning should get logged on debug though in the mean time", - debugLogs.getLogs().size() > 0); - warnLogs.finished(); - debugLogs.finished(); - emptyObsQueue(); - - //Wait some time so reach WARN level again - virtualClock.waitUntil(virtualClock.getTime() + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL); - - warnLogs.starting(); - debugLogs.starting(); - addNodeToFillObsQueue(); - assertTrue("Observation queue full warning must get logged after some time has past since last log", - warnLogs.getLogs().size() > 0); - warnLogs.finished(); - debugLogs.finished(); - } finally { - observationManager.removeEventListener(listener); - ChangeProcessor.clock = oldClockInstance; - ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval; - - logLevelSetting.finished(); - } - } - - private void addANode(String prefix) throws RepositoryException { - Session session = getAdminSession(); - Node parent = session.getNode(TEST_PATH); - String nodeName = prefix + numAddedNodes.get(); - parent.addNode(nodeName); - session.save(); - numAddedNodes.incrementAndGet(); - } - - private void addNodeToFillObsQueue() - throws RepositoryException { - blockObservation.acquireUninterruptibly(); - try { - for (int i = 0; i <= OBS_QUEUE_LENGTH; i++) { - addANode("n"); - } - } finally { - blockObservation.release(); - } - } - - private interface Condition { - boolean evaluate(); - } - - private boolean waitFor(long timeout, Condition c) - throws InterruptedException { - long end = System.currentTimeMillis() + timeout; - long remaining = end - System.currentTimeMillis(); - while (remaining > 0) { - if (c.evaluate()) { - return true; - } - - //Add another node only when num_pending_to_be_observed nodes is - //less that observation queue. This is done to let all observation finish - //up in case last few event were dropped due to full observation queue - //(which is ok as the next event that comes in gets diff-ed with last - //processed revision) - if (numAddedNodes.get() < numObservedNodes.get() + OBS_QUEUE_LENGTH) { - try { - addANode("addedWhileWaiting"); - } catch (RepositoryException e) { - LOG.warn("exception while adding during wait: {}", e); - } - } - Thread.sleep(OBS_TIMEOUT_PER_ITEM/10);//The constant is exaggerated - remaining = end - System.currentTimeMillis(); - } - return c.evaluate(); - } - - private void emptyObsQueue() throws InterruptedException { - boolean notTimedOut = waitFor(CONDITION_TIMEOUT, new Condition() { - @Override - public boolean evaluate() { - return numObservedNodes.get()==numAddedNodes.get(); - } - }); - assertTrue("Listener didn't process events within time-out", notTimedOut); - } - - private class BlockableListener implements EventListener { - @Override - public void onEvent(EventIterator events) { - blockObservation.acquireUninterruptibly(); - while (events.hasNext()) { - Event event = events.nextEvent(); - if (event.getType() == Event.NODE_ADDED) { - numObservedNodes.incrementAndGet(); - } - } - blockObservation.release(); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.jcr.observation; + +import ch.qos.logback.classic.Level; +import org.apache.jackrabbit.api.JackrabbitRepository; +import org.apache.jackrabbit.oak.commons.junit.LogCustomizer; +import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; +import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest; +import org.apache.jackrabbit.oak.jcr.Jcr; +import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl; +import org.apache.jackrabbit.oak.stats.Clock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jcr.Node; +import javax.jcr.RepositoryException; +import javax.jcr.Session; +import javax.jcr.SimpleCredentials; +import javax.jcr.observation.Event; +import javax.jcr.observation.EventIterator; +import javax.jcr.observation.EventListener; +import javax.jcr.observation.ObservationManager; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static javax.jcr.observation.Event.NODE_ADDED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class ObservationQueueFullWarnTest extends AbstractRepositoryTest { + private static final int OBS_QUEUE_LENGTH = 5; + private static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted."; + + private static final String TEST_NODE = "test_node"; + private static final String TEST_NODE_TYPE = "oak:Unstructured"; + private static final String TEST_PATH = '/' + TEST_NODE; + + private static final long OBS_TIMEOUT_PER_ITEM = 1000; + private static final long CONDITION_TIMEOUT = OBS_QUEUE_LENGTH * OBS_TIMEOUT_PER_ITEM; + + private Session observingSession; + private ObservationManager observationManager; + + private final BlockableListener listener = new BlockableListener(); + + private static final Logger LOG = LoggerFactory.getLogger(ObservationQueueFullWarnTest.class); + + private final Semaphore blockObservation = new Semaphore(1); + + private final AtomicInteger numAddedNodes = new AtomicInteger(0); + private final AtomicInteger numObservedNodes = new AtomicInteger(0); + + public ObservationQueueFullWarnTest(NodeStoreFixture fixture) { + super(fixture); + LOG.info("fixture: {}", fixture); + } + + @Override + protected Jcr initJcr(Jcr jcr) { + return jcr.withObservationQueueLength(OBS_QUEUE_LENGTH); + } + + @Before + public void setup() throws RepositoryException { + Session session = getAdminSession(); + + session.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE); + session.save(); + + Map attrs = new HashMap<>(); + attrs.put(RepositoryImpl.REFRESH_INTERVAL, 0); + observingSession = ((JackrabbitRepository) getRepository()).login(new SimpleCredentials("admin", "admin".toCharArray()), null, attrs); + observationManager = observingSession.getWorkspace().getObservationManager(); + } + + @After + public void tearDown() { + observingSession.logout(); + } + + @Test + public void warnOnQueueFull() throws RepositoryException, InterruptedException, ExecutionException { + LogCustomizer customLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName()) + .filter(Level.WARN) + .contains(OBS_QUEUE_FULL_WARN) + .create(); + + observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false); + try { + customLogs.starting(); + addNodeToFillObsQueue(); + assertTrue("Observation queue full warning must get logged", customLogs.getLogs().size() > 0); + customLogs.finished(); + } finally { + observationManager.removeEventListener(listener); + } + } + + @Test + public void warnOnRepeatedQueueFull() throws RepositoryException, InterruptedException, ExecutionException { + LogCustomizer warnLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName()) + .filter(Level.WARN) + .contains(OBS_QUEUE_FULL_WARN) + .create(); + LogCustomizer debugLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName()) + .filter(Level.DEBUG) + .contains(OBS_QUEUE_FULL_WARN) + .create(); + LogCustomizer logLevelSetting = LogCustomizer.forLogger(ChangeProcessor.class.getName()) + .enable(Level.DEBUG) + .create(); + logLevelSetting.starting(); + + long oldWarnLogInterval = ChangeProcessor.QUEUE_FULL_WARN_INTERVAL; + //Assumption is that 10 (virtual) minutes won't pass by the time we move from one stage of queue fill to next. + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(10); + + Clock oldClockInstance = ChangeProcessor.clock; + Clock virtualClock = new Clock.Virtual(); + ChangeProcessor.clock = virtualClock; + virtualClock.waitUntil(System.currentTimeMillis()); + + observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false); + try { + //Create first level WARN message + addNodeToFillObsQueue(); + emptyObsQueue(); + + //Don't wait, fill up the queue again + warnLogs.starting(); + debugLogs.starting(); + addNodeToFillObsQueue(); + assertTrue("Observation queue full warning must not logged until some time has past since last log", + warnLogs.getLogs().size() == 0); + assertTrue("Observation queue full warning should get logged on debug though in the mean time", + debugLogs.getLogs().size() > 0); + warnLogs.finished(); + debugLogs.finished(); + emptyObsQueue(); + + //Wait some time so reach WARN level again + virtualClock.waitUntil(virtualClock.getTime() + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL); + + warnLogs.starting(); + debugLogs.starting(); + addNodeToFillObsQueue(); + assertTrue("Observation queue full warning must get logged after some time has past since last log", + warnLogs.getLogs().size() > 0); + warnLogs.finished(); + debugLogs.finished(); + } finally { + observationManager.removeEventListener(listener); + ChangeProcessor.clock = oldClockInstance; + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval; + + logLevelSetting.finished(); + } + } + + private void addANode(String prefix) throws RepositoryException { + Session session = getAdminSession(); + Node parent = session.getNode(TEST_PATH); + String nodeName = prefix + numAddedNodes.get(); + parent.addNode(nodeName); + session.save(); + numAddedNodes.incrementAndGet(); + } + + private void addNodeToFillObsQueue() + throws RepositoryException { + blockObservation.acquireUninterruptibly(); + try { + for (int i = 0; i <= OBS_QUEUE_LENGTH; i++) { + addANode("n"); + } + } finally { + blockObservation.release(); + } + } + + private interface Condition { + boolean evaluate(); + } + + private boolean waitFor(long timeout, Condition c) + throws InterruptedException { + long end = System.currentTimeMillis() + timeout; + long remaining = end - System.currentTimeMillis(); + while (remaining > 0) { + if (c.evaluate()) { + return true; + } + + //Add another node only when num_pending_to_be_observed nodes is + //less that observation queue. This is done to let all observation finish + //up in case last few event were dropped due to full observation queue + //(which is ok as the next event that comes in gets diff-ed with last + //processed revision) + if (numAddedNodes.get() < numObservedNodes.get() + OBS_QUEUE_LENGTH) { + try { + addANode("addedWhileWaiting"); + } catch (RepositoryException e) { + LOG.warn("exception while adding during wait: {}", e); + } + } + Thread.sleep(OBS_TIMEOUT_PER_ITEM/10);//The constant is exaggerated + remaining = end - System.currentTimeMillis(); + } + return c.evaluate(); + } + + private void emptyObsQueue() throws InterruptedException { + boolean notTimedOut = waitFor(CONDITION_TIMEOUT, new Condition() { + @Override + public boolean evaluate() { + return numObservedNodes.get()==numAddedNodes.get(); + } + }); + assertTrue("Listener didn't process events within time-out", notTimedOut); + } + + private class BlockableListener implements EventListener { + @Override + public void onEvent(EventIterator events) { + blockObservation.acquireUninterruptibly(); + while (events.hasNext()) { + Event event = events.nextEvent(); + if (event.getType() == Event.NODE_ADDED) { + numObservedNodes.incrementAndGet(); + } + } + blockObservation.release(); + } + } + + @Test + public void testQueueFullThenFlushing() throws Exception { + final Semaphore semaphore = new Semaphore(0); + final AtomicLong counter = new AtomicLong(0); + EventListener listeners = new EventListener() { + + @Override + public void onEvent(EventIterator events) { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + throw new Error(e); + } finally { + long numEvents = events.getSize(); + counter.addAndGet(numEvents); + System.out.println("GOT: "+numEvents + " - COUNTER: "+counter.get()); + while(events.hasNext()) { + Event e = events.nextEvent(); + System.out.println(" - " + e); + } + } + } + }; + Node root = observingSession.getNode("/"); + root.addNode("testNode"); + observingSession.save(); + + observationManager.addEventListener(listeners, Event.PROPERTY_ADDED, "/", true, null, null, false); + + // send out 7 events (or in general: queue length + 2): + // event #0 will get delivered but stalls at the listener (queue empty though) + // event #1-#6 will fill the queue + // event #7 will not fit in the queue anymore (queue full) + for(int i=0; i pass 100 + + // wait some time to allow the listener to get the events delivered + Thread.sleep(2000); + + System.out.println("COUNTER: "+counter.get()); + assertEquals(OBS_QUEUE_LENGTH + 2, counter.get()); + + root = observingSession.getNode("/"); + root.getNode("testNode").setProperty("p"+(OBS_QUEUE_LENGTH + 2), (OBS_QUEUE_LENGTH + 2)); + System.out.println("storing: /testNode/p"+(OBS_QUEUE_LENGTH + 2)); + observingSession.save(); + + Thread.sleep(1000); + assertEquals(OBS_QUEUE_LENGTH + 3, counter.get()); + } +}