Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java	(revision 1763448)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java	(working copy)
@@ -30,16 +30,18 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import com.google.common.base.Predicate;
 import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Predicate;
+
 /**
  * An observer that uses a change queue and a background thread to forward
  * content changes to another observer. The mechanism is designed so that
@@ -53,17 +55,25 @@
  * (see {@code alwaysCollapseExternalEvents} and {@code oak.observation.alwaysCollapseExternal})
  * automatically merged to just one change.
  */
-public class BackgroundObserver implements Observer, Closeable {
+public class BackgroundObserver implements FilteringAwareObserver, Closeable {
 
     /**
      * Signal for the background thread to stop processing changes.
      */
-    private static final ContentChange STOP = new ContentChange(null, null);
+    private static final ContentChange STOP = new ContentChange(null, null, null);
 
     /**
      * The receiving observer being notified off the background thread.
      */
     private final Observer observer;
+    
+    /**
+     * If the provided Observer implements FilteringAwareObserver
+     * this field is equivalent to observer (otherwise null).
+     * Filtering is only supported if the provided Observer
+     * is of type FilteringAwareObserver
+     */
+    private final FilteringAwareObserver filteringAwareObserver;
 
     /**
      * Executor used to dispatch events
@@ -92,10 +102,12 @@
             Boolean.parseBoolean(System.getProperty("oak.observation.alwaysCollapseExternal", "false"));
 
     private static class ContentChange {
+        private final NodeState previousRootOrNull;
         private final NodeState root;
         private final CommitInfo info;
         private final long created = System.currentTimeMillis();
-        ContentChange(NodeState root, CommitInfo info) {
+        ContentChange(NodeState previousRootOrNull, NodeState root, CommitInfo info) {
+            this.previousRootOrNull = previousRootOrNull;
             this.root = root;
             this.info = info;
         }
@@ -108,6 +120,15 @@
     private ContentChange last;
 
     /**
+     * If non-null indicates that the last call to
+     * the BackgroundObserver was a resetPreviousRoot
+     * and contains that previous root then.
+     * If null indicates that the last call to 
+     * the BackgroundObserver was a contentChanged.
+     */
+    private NodeState excludedRoot;
+    
+    /**
      * Flag to indicate that some content changes were dropped because
      * the queue was full.
      */
@@ -129,6 +150,19 @@
                 try {
                     ContentChange change = queue.poll();
                     if (change != null && change != STOP) {
+                        if (change.previousRootOrNull != null) {
+                            // a ContentChange that carries a previousRoot
+                            // indicates a NOOP change
+                            // which is signalled to the FilteringAwareObserver
+                            // via reset
+                            if (filteringAwareObserver != null) {
+                                // normal case
+                                filteringAwareObserver.resetPreviousRoot(change.previousRootOrNull);
+                            } else {
+                                // this should never happen case
+                                observer.contentChanged(change.previousRootOrNull, CommitInfo.EMPTY);
+                            }
+                        }
                         observer.contentChanged(change.root, change.info);
                         removed(queue.size(), change.created);
                         currentTask.onComplete(completionHandler);
@@ -158,6 +192,11 @@
             int queueLength,
             @Nonnull UncaughtExceptionHandler exceptionHandler) {
         this.observer = checkNotNull(observer);
+        if (observer instanceof FilteringAwareObserver) {
+            this.filteringAwareObserver = (FilteringAwareObserver) observer;            
+        } else {
+            this.filteringAwareObserver = null;
+        }
         this.executor = checkNotNull(executor);
         this.exceptionHandler = checkNotNull(exceptionHandler);
         this.maxQueueLength = queueLength;
@@ -262,6 +301,15 @@
         };
     }
 
+    //--------------------------------------------< FilteringAwareObserver >--
+    
+    public void resetPreviousRoot(NodeState root) {
+        checkState(!stopped);
+        checkNotNull(root);
+        
+        this.excludedRoot = root;
+    };
+
     //----------------------------------------------------------< Observer >--
 
     /**
@@ -272,11 +320,14 @@
         checkState(!stopped);
         checkNotNull(root);
 
-        if (alwaysCollapseExternalEvents && info == null && last != null && last.info == null) {
+        if (excludedRoot==null && alwaysCollapseExternalEvents && info == null && last != null && last.info == null) {
             // This is an external change. If the previous change was
             // also external, we can drop it from the queue (since external
             // changes in any case can cover multiple commits) to help
             // prevent the queue from filling up too fast.
+
+            // this is not applicable if the last change was excluded though
+            // as that would violate the include/exclude boundaries
             queue.remove(last);
             full = false;
         }
@@ -286,9 +337,12 @@
             // If the queue is full, some commits have already been skipped
             // so we need to drop the possible local commit information as
             // only external changes can be merged together to larger chunks.
-            change = new ContentChange(root, null);
+            
+            // also: if the queue was full, filtering cannot be applied,
+            // therefore passing null as the excludedRoot here
+            change = new ContentChange(null, root, null);
         } else {
-            change = new ContentChange(root, info);
+            change = new ContentChange(this.excludedRoot, root, info);
         }
 
         // Try to add this change to the queue without blocking, and
@@ -306,6 +360,9 @@
         // Thus there is no question as to which of the handlers will effectively run.
         currentTask.onComplete(completionHandler);
         added(queue.size());
+
+        // reset the excludedRoot flag
+        this.excludedRoot = null;
     }
 
     //------------------------------------------------------------< internal >---
@@ -313,4 +370,23 @@
     private static Logger getLogger(@Nonnull Observer observer) {
         return LoggerFactory.getLogger(checkNotNull(observer).getClass());
     }
+    
+    /** FOR TESTING ONLY 
+     * @throws InterruptedException **/
+    protected boolean waitUntilStopped(int timeout, TimeUnit unit) throws InterruptedException {
+    	long done = System.currentTimeMillis() + unit.toMillis(timeout);
+    	synchronized(this) {
+    	    queue.add(STOP);
+            currentTask.onComplete(completionHandler);
+    	}
+    	while(done > System.currentTimeMillis()) {
+    		synchronized(this) {
+    			if (queue.size() == 0) {
+    				return true;
+    			}
+    			wait(1);
+    		}
+    	}
+    	return false;
+    }
 }
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/FilteringAwareObserver.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/FilteringAwareObserver.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/FilteringAwareObserver.java	(working copy)
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+/**
+ * Extension of an Observer which is aware and capable of handling filtering of
+ * content changes. That is, when a upstream filters one or a few content
+ * changes it signals this to a FilteringAwareObserver by calling
+ * resetPreviousRoot(NodeState). This indicates that the previous root (which
+ * all Observers are typically handling one way or another) should be set to the
+ * value provided (typically this corresponds to a fast-forward).
+ * <p>
+ * In a chain of Observers once an Observer implements a FilteringAwareObserver
+ * this typically requires all downstream Observers to implement this too - as
+ * otherwise there's no way of signaling filtering downstream.
+ */
+public interface FilteringAwareObserver extends Observer {
+
+    /**
+     * Invoked to enable such observers to reset there previous root to given
+     * NodeState
+     * 
+     * @param root
+     *            previous NodeState root
+     */
+    void resetPreviousRoot(NodeState root);
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/FilteringAwareObserver.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/FilteringObserver.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/FilteringObserver.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/FilteringObserver.java	(working copy)
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+/**
+ * A FilteringObserver is the entry point for a chain of Observers that
+ * implement the extension FilteringAwareObserver.
+ * <p>
+ * The FilteringObserver is abstract and should be extended with the isExcluded
+ * method - which decides if an element should be passed on to the downstream
+ * observer or not. If it isn't passed (ie it's filtered out) then upon the next
+ * non-filtered contentChanged call a resetPreviousRoot is first done.
+ * <p>
+ * The idea is to chain this FilteringObserver in front of eg a
+ * BackgroundObserver.
+ */
+abstract class FilteringObserver implements Observer {
+
+    private final FilteringAwareObserver observer;
+
+    private NodeState previousRoot;
+    private NodeState excludedRoot;
+
+    public FilteringObserver(FilteringAwareObserver observer) {
+        this.observer = observer;
+    }
+
+    /**
+     * Decides whether or not to exclude the change represented by the given
+     * before, after and CommitInfo.
+     */
+    protected abstract boolean isExcluded(NodeState before, NodeState after, CommitInfo info);
+
+    @Override
+    public void contentChanged(NodeState root, CommitInfo info) {
+        if (isExcluded(previousRoot, root, info)) {
+            excludedRoot = root;
+        } else {
+            if (excludedRoot != null) {
+                observer.resetPreviousRoot(excludedRoot);
+                excludedRoot = null;
+            }
+            observer.contentChanged(root, info);
+        }
+        previousRoot = root;
+    }
+
+}
\ No newline at end of file

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/FilteringObserver.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java	(revision 1763448)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java	(working copy)
@@ -20,34 +20,47 @@
 package org.apache.jackrabbit.oak.spi.commit;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.io.Closeable;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import com.google.common.collect.Lists;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.After;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
+import junit.framework.AssertionFailedError;
+
 public class BackgroundObserverTest {
     private static final CommitInfo COMMIT_INFO = new CommitInfo("no-session", null);
     public static final int CHANGE_COUNT = 1024;
 
     private final List<Runnable> assertions = Lists.newArrayList();
     private CountDownLatch doneCounter;
+    private final List<Closeable> closeables = Lists.newArrayList();
 
     /**
      * Assert that each observer of many running concurrently sees the same
-     * linearly sequence of commits (i.e. sees the commits in the correct order).
+     * linearly sequence of commits (i.e. sees the commits in the correct
+     * order).
      */
     @Test
     public void concurrentObservers() throws InterruptedException {
@@ -99,7 +112,7 @@
         return new BackgroundObserver(new Observer() {
             // Need synchronised list here to maintain correct memory barrier
             // when this is passed on to done(List<Runnable>)
-            final List<Runnable> assertions = Collections.synchronizedList(Lists.<Runnable>newArrayList());
+            final List<Runnable> assertions = Collections.synchronizedList(Lists.<Runnable> newArrayList());
             volatile NodeState previous;
 
             @Override
@@ -126,4 +139,297 @@
         }, executor, queueLength);
     }
 
+    class MyFilteringObserver extends FilteringObserver {
+
+        private boolean excludeNext = false;
+
+        public MyFilteringObserver(FilteringAwareObserver observer) {
+            super(observer);
+        }
+
+        void excludeNext(boolean excludeNext) {
+            this.excludeNext = excludeNext;
+        }
+
+        @Override
+        protected boolean isExcluded(NodeState before, NodeState after, CommitInfo info) {
+            return excludeNext;
+        }
+
+        @Override
+        public void contentChanged(NodeState root, CommitInfo info) {
+            super.contentChanged(root, info);
+            excludeNext = false;
+        }
+
+    }
+
+    class Pair {
+        private final NodeState before;
+        private final NodeState after;
+
+        Pair(NodeState before, NodeState after) {
+            this.before = before;
+            this.after = after;
+        }
+
+        @Override
+        public String toString() {
+            return "Pair(before=" + before + ", after=" + after + ")";
+        }
+    }
+
+    class Recorder implements FilteringAwareObserver {
+
+        List<Pair> includedChanges = new LinkedList<Pair>();
+        private NodeState previousRoot;
+        private boolean pause;
+        private boolean pausing;
+
+        @Override
+        public void contentChanged(NodeState root, CommitInfo info) {
+            includedChanges.add(new Pair(previousRoot, root));
+            previousRoot = root;
+            maybePause();
+        }
+
+        @Override
+        public void resetPreviousRoot(NodeState root) {
+            previousRoot = root;
+            maybePause();
+        }
+
+        public void maybePause() {
+            synchronized (this) {
+                try {
+                    while (pause) {
+                        pausing = true;
+                        this.notifyAll();
+                        try {
+                            this.wait();
+                        } catch (InterruptedException e) {
+                            // should not happen
+                        }
+                    }
+                } finally {
+                    pausing = false;
+                    this.notifyAll();
+                }
+            }
+        }
+
+        public synchronized void pause() {
+            this.pause = true;
+        }
+
+        public synchronized void unpause() {
+            this.pause = false;
+            this.notifyAll();
+        }
+
+        public boolean waitForPausing(int timeout, TimeUnit unit) throws InterruptedException {
+            final long done = System.currentTimeMillis() + unit.toMillis(timeout);
+            synchronized (this) {
+                while (!pausing && done > System.currentTimeMillis()) {
+                    this.wait();
+                }
+                return pausing;
+            }
+        }
+
+        public boolean waitForUnpausing(int timeout, TimeUnit unit) throws InterruptedException {
+            final long done = System.currentTimeMillis() + unit.toMillis(timeout);
+            synchronized (this) {
+                while (pausing && done > System.currentTimeMillis()) {
+                    this.wait();
+                }
+                return !pausing;
+            }
+        }
+
+    }
+
+    class NodeStateGenerator {
+        Random r = new Random(1232131); // seed: repeatable tests
+        NodeBuilder builder = EMPTY_NODE.builder();
+
+        NodeState next() {
+            builder.setProperty("p", r.nextInt());
+            NodeState result = builder.getNodeState();
+            builder = result.builder();
+            return result;
+        }
+    }
+
+    private void assertMatches(String msg, List<Pair> expected, List<Pair> actual) {
+        assertEquals("size mismatch. msg=" + msg, expected.size(), actual.size());
+        for (int i = 0; i < expected.size(); i++) {
+            assertSame("mismatch of before at pos=" + i + ", msg=" + msg, expected.get(i).before, actual.get(i).before);
+            assertSame("mismatch of after at pos=" + i + ", msg=" + msg, expected.get(i).after, actual.get(i).after);
+        }
+    }
+
+    @After
+    public void shutDown() throws Exception {
+        for (Closeable closeable : closeables) {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                throw new AssertionFailedError(e.getMessage());
+            }
+        }
+    }
+
+    @Test
+    public void testExcludedAllCommits() throws Exception {
+        Recorder recorder = new Recorder();
+        ExecutorService executor = newSingleThreadExecutor();
+        BackgroundObserver bo = new BackgroundObserver(recorder, executor, 5);
+        ;
+        MyFilteringObserver fo = new MyFilteringObserver(bo);
+        closeables.add(bo);
+        List<Pair> expected = new LinkedList<Pair>();
+        NodeStateGenerator generator = new NodeStateGenerator();
+        NodeState first = generator.next();
+        expected.add(new Pair(null, first));
+        fo.contentChanged(first, CommitInfo.EMPTY);
+        for (int i = 0; i < 100000; i++) {
+            fo.excludeNext(true);
+            fo.contentChanged(generator.next(), CommitInfo.EMPTY);
+        }
+        assertTrue("testExcludedAllCommits", bo.waitUntilStopped(5, TimeUnit.SECONDS));
+        assertMatches("testExcludedAllCommits", expected, recorder.includedChanges);
+    }
+
+    @Test
+    public void testNoExcludedCommits() throws Exception {
+        Recorder recorder = new Recorder();
+        ExecutorService executor = newSingleThreadExecutor();
+        BackgroundObserver bo = new BackgroundObserver(recorder, executor, 10002);
+        MyFilteringObserver fo = new MyFilteringObserver(bo);
+        closeables.add(bo);
+        List<Pair> expected = new LinkedList<Pair>();
+        NodeStateGenerator generator = new NodeStateGenerator();
+        NodeState first = generator.next();
+        expected.add(new Pair(null, first));
+        fo.contentChanged(first, CommitInfo.EMPTY);
+        NodeState previous = first;
+        for (int i = 0; i < 10000; i++) {
+            fo.excludeNext(false);
+            NodeState next = generator.next();
+            expected.add(new Pair(previous, next));
+            previous = next;
+            fo.contentChanged(next, CommitInfo.EMPTY);
+        }
+        assertTrue("testNoExcludedCommits", bo.waitUntilStopped(5, TimeUnit.SECONDS));
+        assertMatches("testNoExcludedCommits", expected, recorder.includedChanges);
+    }
+
+    @Test
+    public void testExcludeCommitsWithFullQueue() throws Exception {
+        Recorder recorder = new Recorder();
+        ExecutorService executor = newSingleThreadExecutor();
+        BackgroundObserver bo = new BackgroundObserver(recorder, executor, 2);
+        MyFilteringObserver fo = new MyFilteringObserver(bo);
+        closeables.add(bo);
+        List<Pair> expected = new LinkedList<Pair>();
+        NodeStateGenerator generator = new NodeStateGenerator();
+        recorder.pause();
+
+        // the first one will directly go to the recorder
+        NodeState firstIncluded = generator.next();
+        expected.add(new Pair(null, firstIncluded));
+        fo.contentChanged(firstIncluded, CommitInfo.EMPTY);
+
+        assertTrue("observer did not get called (yet?)", recorder.waitForPausing(5, TimeUnit.SECONDS));
+
+        // this one will the queued as #1
+        NodeState secondIncluded = generator.next();
+        expected.add(new Pair(firstIncluded, secondIncluded));
+        fo.contentChanged(secondIncluded, CommitInfo.EMPTY);
+
+        // this one will the queued as #2
+        NodeState thirdIncluded = generator.next();
+        expected.add(new Pair(secondIncluded, thirdIncluded));
+        fo.contentChanged(thirdIncluded, CommitInfo.EMPTY);
+
+        // this one will cause the queue to 'overflow' (full==true)
+        NodeState forthQueueFull = generator.next();
+        // not adding to expected, as this one ends up in the overflow element
+        fo.contentChanged(forthQueueFull, CommitInfo.EMPTY);
+
+        NodeState next;
+        // exclude when queue is full
+        fo.excludeNext(true);
+        next = generator.next();
+        // if excluded==true and full, hence not adding to expected
+        fo.contentChanged(next, CommitInfo.EMPTY);
+        // include after an exclude when queue was full
+        // => this is not supported. when the queue
+        fo.excludeNext(false);
+        next = generator.next();
+        // excluded==false BUT queue full, hence not adding to expected
+        fo.contentChanged(next, CommitInfo.EMPTY);
+        // let recorder continue
+        recorder.unpause();
+
+        recorder.waitForUnpausing(5, TimeUnit.SECONDS);
+        Thread.sleep(1000); // wait for 1 element to be dequeued at least
+        // exclude when queue is no longer full
+        fo.excludeNext(true);
+        NodeState seventhAfterQueueFull = generator.next();
+        // with the introduction of the FilteringAwareObserver this
+        // 'seventhAfterQueueFull' root will not be forwarded
+        // to the BackgroundObserver - thus entirely filtered
+
+        fo.contentChanged(seventhAfterQueueFull, CommitInfo.EMPTY);
+
+        // but with the introduction of FilteringAwareObserver the delivery
+        // only happens with non-filtered items, so adding yet another one now
+        fo.excludeNext(false);
+        NodeState last = generator.next();
+        expected.add(new Pair(thirdIncluded, last));
+        fo.contentChanged(last, CommitInfo.EMPTY);
+
+        assertTrue("testExcludeCommitsWithFullQueue", bo.waitUntilStopped(5, TimeUnit.SECONDS));
+        assertMatches("testExcludeCommitsWithFullQueue", expected, recorder.includedChanges);
+    }
+
+    @Test
+    public void testExcludeSomeCommits() throws Exception {
+        ExecutorService executor = newSingleThreadExecutor();
+        for (int i = 0; i < 100; i++) {
+            doTestExcludeSomeCommits(i, executor);
+        }
+        for (int i = 100; i < 10000; i += 50) {
+            doTestExcludeSomeCommits(i, executor);
+        }
+    }
+
+    private void doTestExcludeSomeCommits(int cnt, Executor executor) throws Exception {
+        Recorder recorder = new Recorder();
+        BackgroundObserver bo = new BackgroundObserver(recorder, executor, cnt + 2);
+        MyFilteringObserver fo = new MyFilteringObserver(bo);
+        closeables.add(bo);
+        List<Pair> expected = new LinkedList<Pair>();
+        Random r = new Random(2343242); // seed: repeatable tests
+        NodeStateGenerator generator = new NodeStateGenerator();
+        NodeState first = generator.next();
+        expected.add(new Pair(null, first));
+        fo.contentChanged(first, CommitInfo.EMPTY);
+        NodeState previous = first;
+        for (int i = 0; i < cnt; i++) {
+            boolean excludeNext = r.nextInt(100) < 90;
+            fo.excludeNext(excludeNext);
+            NodeState next = generator.next();
+            if (!excludeNext) {
+                expected.add(new Pair(previous, next));
+            }
+            previous = next;
+            fo.contentChanged(next, CommitInfo.EMPTY);
+        }
+        assertTrue("cnt=" + cnt, bo.waitUntilStopped(5, TimeUnit.SECONDS));
+        assertMatches("cnt=" + cnt, expected, recorder.includedChanges);
+    }
+
 }
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java	(revision 0)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java	(working copy)
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.Test;
+
+public class PrefilteringBackgroundObserverTest {
+    
+    private List<Runnable> runnableQ;
+    private ExecutorService executor;
+    private CompositeObserver compositeObserver;
+    private List<ContentChanged> received;
+    private EnqueuingObserver enqueuingObserver;
+    private BackgroundObserver backgroundObserver;
+    private FilteringObserver filteringObserver;
+    private CommitInfo includingCommitInfo = new CommitInfo(CommitInfo.OAK_UNKNOWN, CommitInfo.OAK_UNKNOWN);
+    private CommitInfo excludingCommitInfo = new CommitInfo(CommitInfo.OAK_UNKNOWN, CommitInfo.OAK_UNKNOWN);
+    private int resetCallCnt;
+    
+    public void init(int queueLength) throws Exception {
+        runnableQ = new LinkedList<Runnable>();
+        executor = new EnqueuingExecutorService(runnableQ);
+        compositeObserver = new CompositeObserver();
+        received = new LinkedList<ContentChanged>();
+        enqueuingObserver = new EnqueuingObserver(received) {
+            @Override
+            public void resetPreviousRoot(NodeState root) {
+                super.resetPreviousRoot(root);
+                resetCallCnt++;
+            }
+        };
+
+        backgroundObserver = new BackgroundObserver(enqueuingObserver, executor, queueLength);
+        filteringObserver = new FilteringObserver(backgroundObserver) {
+
+            @Override
+            protected boolean isExcluded(NodeState before, NodeState after, CommitInfo info) {
+                if (info == includingCommitInfo) {
+                    return false;
+                } else if (info == excludingCommitInfo) {
+                    return true;
+                } else if (info == null) {
+                    return false;
+                }
+                throw new IllegalStateException("only supporting include or exclude");
+            }
+            
+        };
+        compositeObserver.addObserver(filteringObserver);
+    }
+
+    private class EnqueuingObserver implements FilteringAwareObserver {
+        private final List<ContentChanged> received;
+
+        private EnqueuingObserver(List<ContentChanged> received) {
+            this.received = received;
+        }
+
+        @Override
+        public void contentChanged(@Nonnull final NodeState root, @Nullable CommitInfo info) {
+            received.add(new ContentChanged(root, info));
+        }
+
+        @Override
+        public void resetPreviousRoot(NodeState root) {
+            // nothing here
+        }
+    }
+
+    private final class EnqueuingExecutorService extends AbstractExecutorService {
+        private final List<Runnable> runnableQ;
+
+        private EnqueuingExecutorService(List<Runnable> runnableQ) {
+            this.runnableQ = runnableQ;
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            runnableQ.add(command);
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            throw new IllegalStateException("nyi");
+        }
+
+        @Override
+        public void shutdown() {
+            throw new IllegalStateException("nyi");
+        }
+
+        @Override
+        public boolean isTerminated() {
+            throw new IllegalStateException("nyi");
+        }
+
+        @Override
+        public boolean isShutdown() {
+            throw new IllegalStateException("nyi");
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            throw new IllegalStateException("nyi");
+        }
+    }
+
+    class ContentChanged {
+        NodeState root;
+        CommitInfo info;
+        ContentChanged(NodeState root, CommitInfo info) {
+            this.root = root;
+            this.info = info;
+        }
+    }
+    
+    private static void executeRunnables(final List<Runnable> runnableQ, int num) {
+        for(int i=0; i<num; i++) {
+            for (Runnable runnable : new ArrayList<Runnable>(runnableQ)) {
+                runnable.run();
+            }
+        }
+    }
+
+    private static NodeState p(int k) {
+        return EMPTY_NODE.builder().setProperty("p", k).getNodeState();
+    }
+
+    @Test
+    public void testFlipping() throws Exception {
+        final int queueLength = 2000;
+        init(queueLength);
+
+        // initialize observer with an initial contentChanged
+        // (see ChangeDispatcher#addObserver)
+        {
+            compositeObserver.contentChanged(p(-1), null);
+        }
+        // Part 1 : first run with filtersEvaluatedMapWithEmptyObservers - empty or null shouldn't matter, it's excluded in both cases
+        for (int k = 0; k < 1000; k++) {
+            CommitInfo info;
+            if (k%2==1) {
+                info = includingCommitInfo;
+            } else {
+                info = excludingCommitInfo;
+            }
+            final NodeState p = p(k);
+            compositeObserver.contentChanged(p, info);
+            if (k%10 == 0) {
+                executeRunnables(runnableQ, 10);
+            }
+        }
+        executeRunnables(runnableQ, 10);
+        
+        assertEquals(501, received.size());
+        assertEquals(500, resetCallCnt);
+        
+        // Part 2 : run with filtersEvaluatedMapWithNullObservers - empty or null shouldn't matter, it's excluded in both cases
+        received.clear();
+        resetCallCnt = 0;
+        for (int k = 0; k < 1000; k++) {
+            CommitInfo info;
+            if (k%2==1) {
+                info = includingCommitInfo;
+            } else {
+                info = excludingCommitInfo;
+            }
+            final NodeState p = p(k);
+            compositeObserver.contentChanged(p, info);
+            if (k%10 == 0) {
+                executeRunnables(runnableQ, 10);
+            }
+        }
+        executeRunnables(runnableQ, 10);
+        
+        assertEquals(500, received.size());
+        assertEquals(500, resetCallCnt);
+        
+        // Part 3 : unlike the method name suggests, this variant tests with the filter disabled, so should receive all events normally
+        received.clear();
+        resetCallCnt = 0;
+        for (int k = 0; k < 1000; k++) {
+            CommitInfo info;
+            if (k%2==1) {
+                info = includingCommitInfo;
+            } else {
+                info = includingCommitInfo;
+            }
+            final NodeState p = p(k);
+            compositeObserver.contentChanged(p, info);
+            if (k%10 == 0) {
+                executeRunnables(runnableQ, 10);
+            }
+        }
+        executeRunnables(runnableQ, 10);
+        
+        assertEquals(1000, received.size());
+        assertEquals(0, resetCallCnt);
+    }
+
+    @Test
+    public void testFlipping2() throws Exception {
+        doTestFullQueue(6, 
+                new TestPattern(false, 1, true, 1, 0),
+                new TestPattern(true, 5, true, 0, 0),
+                new TestPattern(false, 2, true, 2, 1),
+                new TestPattern(true, 1, true, 0, 0),
+                new TestPattern(false, 2, true, 2, 1));
+    }
+    
+    @Test
+    public void testQueueNotFull() throws Exception {
+        doTestFullQueue(20, 
+                // start: empty queue
+                new TestPattern(true, 1000, false, 0, 0),
+                // here: still empty, just the previousRoot is set to remember above NOOPs
+                new TestPattern(false, 5, false, 0, 0),
+                // here: 5 changes are in the queue, the queue fits 20, way to go
+                new TestPattern(true, 500, false, 0, 0),
+                // still 5 in the queue
+                new TestPattern(false, 5, false, 0, 0),
+                // now we added 2, queue still not full
+                new TestPattern(true, 0 /* only flush*/, true, 10, 2)
+                );
+    }
+    
+    @Test
+    public void testIncludeOnQueueFull() throws Exception {
+        doTestFullQueue(6, 
+                // start: empty queue
+                new TestPattern(true, 1000, false, 0, 0),
+                // here: still empty, just the previousRoot is set to remember above NOOPs
+                new TestPattern(false, 5, false, 0, 0),
+                // here: 5 changes are in the queue, the queue fits 6, so almost full
+                new TestPattern(true, 500, false, 0, 0),
+                // still 5 in the queue, of 6
+                new TestPattern(false, 5, false, 0, 0),
+                // now we added 2, so the queue got full
+                new TestPattern(true, 0 /* only flush*/, true, 6, 2)
+                );
+    }
+    
+    @Test
+    public void testExcludeOnQueueFull2() throws Exception {
+        doTestFullQueue(1,
+                // start: empty queue
+                new TestPattern(false, 10, false, 0, 0),
+                new TestPattern(true, 0 /* only flush*/, true, 1, 0),
+                new TestPattern(false, 10, false, 0, 0),
+                new TestPattern(true, 10, false, 0, 0),
+                new TestPattern(false, 10, false, 0, 0),
+                new TestPattern(true, 0 /* only flush*/, true, 1, 0),
+                new TestPattern(false, 10, false, 0, 0),
+                new TestPattern(true, 10, false, 0, 0),
+                new TestPattern(true, 0 /* only flush*/, true, 1, 0),
+                new TestPattern(true, 10, false, 0, 0),
+                new TestPattern(false, 10, false, 0, 0),
+                new TestPattern(true, 0 /* only flush*/, true, 1, 0));
+    }
+
+    @Test
+    public void testExcludeOnQueueFull1() throws Exception {
+        doTestFullQueue(3, 
+                // start: empty queue
+                new TestPattern(true, 1, false, 0, 0),
+                // here: still empty, just the previousRoot is set to remember above NOOP
+                new TestPattern(false, 3, false, 0, 0),
+                // here: 3 changes are in the queue, the queue fits 3, so it just got full now
+                new TestPattern(true, 1, false, 0, 0),
+                // still full but it's ignored, so doesn't have any queue length effect
+                new TestPattern(false, 3, false, 0, 0),
+                // adding 3 will not work, it will result in an overflow entry
+                new TestPattern(true, 0 /* only flush*/, true, 3, 1),
+                new TestPattern(false, 1, false, 0, 0),
+                new TestPattern(true, 0 /* only flush*/, true, 1, 0)
+                );
+    }
+
+    class TestPattern {
+        final boolean flush;
+        final boolean excluded;
+        final int numEvents;
+        final int expectedNumEvents;
+        final int expectedNumResetCalls;
+        TestPattern(boolean excluded, int numEvents, boolean flush, int expectedNumEvents, int expectedNumResetCalls) {
+            this.flush = flush;
+            this.excluded = excluded;
+            this.numEvents = numEvents;
+            this.expectedNumEvents = expectedNumEvents;
+            this.expectedNumResetCalls = expectedNumResetCalls;
+        }
+        
+        @Override
+        public String toString() {
+            return "excluded="+excluded+", numEvents="+numEvents+", flush="+flush+", expectedNumEvents="+expectedNumEvents+", expectedNumResetCalls="+expectedNumResetCalls;
+        }
+    }
+    
+    private void doTestFullQueue(int queueLength, TestPattern... testPatterns) throws Exception {
+        init(queueLength);
+
+        // initialize observer with an initial contentChanged
+        // (see ChangeDispatcher#addObserver)
+        {
+            compositeObserver.contentChanged(p(-1), null);
+        }
+        // remove above first event right away
+        executeRunnables(runnableQ, 5);
+        received.clear();
+        resetCallCnt = 0;
+        
+        int k = 0;
+        int loopCnt = 0;
+        for (TestPattern testPattern : testPatterns) {
+            k++;
+            for(int i=0; i<testPattern.numEvents; i++) {
+                CommitInfo info;
+                if (!testPattern.excluded) {
+                    info = includingCommitInfo;
+                } else {
+                    info = excludingCommitInfo;
+                }
+                k++;
+                compositeObserver.contentChanged(p(k), info);
+            }
+            if (testPattern.flush) {
+                executeRunnables(runnableQ, testPattern.numEvents + testPattern.expectedNumEvents + testPattern.expectedNumResetCalls + 10);
+            }
+            assertEquals("loopCnt="+loopCnt, testPattern.expectedNumEvents, received.size());
+            assertEquals("loopCnt="+loopCnt, testPattern.expectedNumResetCalls, resetCallCnt);
+            received.clear();
+            resetCallCnt = 0;
+            loopCnt++;
+        }
+    }
+
+}

Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
