Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java	(working copy)
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
+
+/**
+ * A ChangeSetFilter is capable of inspecting a ChangeSet
+ * and deciding if the corresponding consumer
+ * (eg EventListener) is possibly interested in it
+ * or definitely not.
+ * <p>
+ * Falsely deciding to include is fine, falsely
+ * deciding to exclude is not.
+ */
+public interface ChangeSetFilter {
+
+    /**
+     * Decides if the commit belonging to the provided
+     * ChangeSet is potentially relevant to the listener
+     * or if it can definitely be excluded.
+     */
+	boolean excludes(ChangeSet changeSet);
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java	(working copy)
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
+
+public class ChangeSetFilterImpl implements ChangeSetFilter {
+
+    private final Set<String> rootIncludePaths;
+    private final Set<Pattern> includePathPatterns;
+    private final Set<Pattern> excludePathPatterns;
+    private final Set<String> parentNodeNames;
+    private final Set<String> parentNodeTypes;
+    private final Set<String> propertyNames;
+
+    public ChangeSetFilterImpl(@Nonnull Set<String> includedParentPaths, boolean isDeep, Set<String> excludedParentPaths,
+            Set<String> parentNodeNames, Set<String> parentNodeTypes, Set<String> propertyNames) {
+        this.rootIncludePaths = new HashSet<String>();
+        this.includePathPatterns = new HashSet<Pattern>();
+        for (String aRawIncludePath : includedParentPaths) {
+            final String aGlobbingIncludePath;
+            if (aRawIncludePath.contains("*")) {
+                // then isDeep is not applicable, it is already a glob path
+                aGlobbingIncludePath = aRawIncludePath;
+            } else {
+                aGlobbingIncludePath = !isDeep ? aRawIncludePath : concat(aRawIncludePath, "**");
+            }
+            this.rootIncludePaths.add(aRawIncludePath);
+            this.includePathPatterns.add(asPattern(aGlobbingIncludePath));
+        }
+        this.excludePathPatterns = new HashSet<Pattern>();
+        for (String aRawExcludePath : excludedParentPaths) {
+            this.excludePathPatterns.add(asPattern(concat(aRawExcludePath, "**")));
+        }
+        this.propertyNames = propertyNames == null ? null : new HashSet<String>(propertyNames);
+        this.parentNodeTypes = parentNodeTypes == null ? null : new HashSet<String>(parentNodeTypes);
+        this.parentNodeNames = parentNodeNames == null ? null : new HashSet<String>(parentNodeNames);
+    }
+
+    private Pattern asPattern(String patternWithGlobs) {
+        return Pattern.compile(GlobbingPathHelper.globPathAsRegex(patternWithGlobs));
+    }
+
+    @Override
+    public boolean excludes(ChangeSet changeSet) {
+        final Set<String> parentPaths = new HashSet<String>(changeSet.getParentPaths());
+
+        // first go through excludes to remove those that are explicitly
+        // excluded
+        if (this.excludePathPatterns.size() != 0) {
+            final Iterator<String> it = parentPaths.iterator();
+            while (it.hasNext()) {
+                final String aParentPath = it.next();
+                if (patternsMatch(this.excludePathPatterns, aParentPath)) {
+                    // if an exclude pattern matches, remove the parentPath
+                    it.remove();
+                }
+            }
+        }
+        // note that cut-off paths are not applied with excludes,
+        // eg if excludePaths contains /var/foo/bar and path contains /var/foo
+        // with a maxPathLevel of 2, that might very well mean that
+        // the actual path would have been /var/foo/bar, but we don't know.
+        // so we cannot exclude it here and thus have a potential false negative
+        // (ie we didn't exclude it in the prefilter)
+
+        // now remainingPaths contains what is not excluded,
+        // then check if it is included
+        boolean included = false;
+        for (String aPath : parentPaths) {
+            // direct set contains is fastest, lets try that first
+            if (this.rootIncludePaths.contains(aPath)) {
+                included = true;
+                break;
+            }
+            if (patternsMatch(this.includePathPatterns, aPath)) {
+                included = true;
+                break;
+            }
+        }
+
+        if (!included) {
+            // well then we can definitely say that this commit is excluded
+            return true;
+        }
+
+        if (this.propertyNames != null && this.propertyNames.size() != 0) {
+            included = false;
+            for (String aProperty : changeSet.getPropertyNames()) {
+                if (this.propertyNames.contains(aProperty)) {
+                    included = true;
+                    break;
+                }
+            }
+            // if propertyNames are defined then if we can't find any
+            // at this stage (if !included) then this equals to filtering out
+            if (!included) {
+                return true;
+            }
+            // otherwise we have found a match, but one of the
+            // nodeType/nodeNames
+            // could still filter out, so we have to continue...
+        }
+
+        if (this.parentNodeTypes != null && this.parentNodeTypes.size() != 0) {
+            included = false;
+            for (String aNodeType : changeSet.getParentNodeTypes()) {
+                if (this.parentNodeTypes.contains(aNodeType)) {
+                    included = true;
+                    break;
+                }
+            }
+            // same story here: if nodeTypes is defined and we can't find any
+            // match
+            // then we're done now
+            if (!included) {
+                return true;
+            }
+            // otherwise, again, continue
+        }
+
+        if (this.parentNodeNames != null && this.parentNodeNames.size() != 0) {
+            included = false;
+            for (String aNodeName : changeSet.getParentNodeNames()) {
+                if (this.parentNodeNames.contains(aNodeName)) {
+                    included = true;
+                    break;
+                }
+            }
+            // and a 3rd time, if we can't find any nodeName match
+            // here, then we're filtering out
+            if (!included) {
+                return true;
+            }
+        }
+
+        // at this stage we haven't found any exclude, so we're likely including
+        return false;
+    }
+
+    private static boolean patternsMatch(Set<Pattern> pathPatterns, String path) {
+        if (path == null) {
+            return false;
+        }
+        for (Pattern pathPattern : pathPatterns) {
+            if (pathPattern.matcher(path).matches()) {
+                return true;
+            }
+        }
+        return false;
+    }
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java	(revision 1768046)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java	(working copy)
@@ -42,6 +42,7 @@
 import com.google.common.collect.Iterables;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.plugins.nodetype.TypePredicate;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
 import org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector;
 import org.apache.jackrabbit.oak.plugins.tree.RootFactory;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -59,6 +60,13 @@
     private boolean includeClusterLocal = true;
     private final List<String> subTrees = newArrayList();
     private Condition condition = includeAll();
+    private ChangeSetFilter changeSetFilter = new ChangeSetFilter() {
+        
+        @Override
+        public boolean excludes(ChangeSet changeSet) {
+            return false;
+        }
+    };
 
     private EventAggregator aggregator;
 
@@ -66,6 +74,12 @@
         @Nonnull
         EventFilter createFilter(@Nonnull NodeState before, @Nonnull NodeState after);
     }
+    
+    @Nonnull
+    public FilterBuilder setChangeSetFilter(@Nonnull ChangeSetFilter changeSetFilter) {
+        this.changeSetFilter = changeSetFilter;
+        return this;
+    }
 
     /**
      * Adds a path to the set of paths whose subtrees include all events of
@@ -380,6 +394,7 @@
             final EventAggregator aggregator = FilterBuilder.this.aggregator;
             final Iterable<String> subTrees = FilterBuilder.this.getSubTrees();
             final Condition condition = FilterBuilder.this.condition;
+            final ChangeSetFilter changeSetFilter = FilterBuilder.this.changeSetFilter;
 
             @Override
             public boolean includeCommit(@Nonnull String sessionId, @CheckForNull CommitInfo info) {
@@ -417,6 +432,11 @@
             public EventAggregator getEventAggregator() {
                 return aggregator;
             }
+            
+            @Override
+            public boolean excludes(ChangeSet changeSet) {
+                return changeSetFilter.excludes(changeSet);
+            }
         };
     }
 
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java	(revision 1768046)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java	(working copy)
@@ -28,8 +28,11 @@
 /**
  * Instance of this class provide a {@link EventFilter} for observation
  * events and a filter for commits.
+ * <p>
+ * In order to support OAK-4908 a FilterProvider
+ * extends ChangeSetFilter
  */
-public interface FilterProvider {
+public interface FilterProvider extends ChangeSetFilter {
 
     /**
      * Filter whole commits. Only commits for which this method returns
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java	(revision 1768046)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java	(working copy)
@@ -30,6 +30,7 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -313,4 +314,28 @@
     private static Logger getLogger(@Nonnull Observer observer) {
         return LoggerFactory.getLogger(checkNotNull(observer).getClass());
     }
+
+    
+    /** FOR TESTING ONLY 
+     * @throws InterruptedException **/
+    boolean waitUntilStopped(int timeout, TimeUnit unit) throws InterruptedException {
+        long done = System.currentTimeMillis() + unit.toMillis(timeout);
+        boolean added = false;
+        synchronized(this) {
+            added = queue.offer(STOP);
+            currentTask.onComplete(completionHandler);
+        }
+        while(done > System.currentTimeMillis()) {
+            synchronized(this) {
+                if (!added) {
+                    added = queue.offer(STOP);
+                }
+                if (queue.size() == 0 || (queue.size() == 1 && queue.peek() == STOP)) {
+                    return true;
+                }
+                wait(1);
+            }
+        }
+        return false;
+    }
 }
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java	(revision 0)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java	(working copy)
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSetBuilder;
+import org.junit.Test;
+
+public class ChangeSetFilterImplTest {
+
+    /** shortcut for creating a set of strings */
+    private Set<String> s(String... entries) {
+        return new HashSet<String>(Arrays.asList(entries));
+    }
+    
+    private ChangeSet newChangeSet(int maxPathDepth, Set<String> parentPaths,
+            Set<String> parentNodeNames,
+            Set<String> parentNodeTypes,
+            Set<String> propertyNames) {
+        ChangeSetBuilder changeSetBuilder = new ChangeSetBuilder(Integer.MAX_VALUE, maxPathDepth);
+        changeSetBuilder.getParentPaths().addAll(parentPaths);
+        changeSetBuilder.getParentNodeNames().addAll(parentNodeNames);
+        changeSetBuilder.getParentNodeTypes().addAll(parentNodeTypes);
+        changeSetBuilder.getPropertyNames().addAll(propertyNames);
+        return changeSetBuilder.build();
+    }
+
+    @Test
+    public void testIsDeepFalse() throws Exception {
+        ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), false, s("/excluded"), s(), s(), s());
+        
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/child1", "/child2"), s("child1", "child2"), s(), s())));
+        assertFalse(prefilter.excludes(newChangeSet(5, s("/", "/child2"), s("child2"), s(), s())));
+    }
+
+    @Test
+    public void testParentPathsIncludeExclude() throws Exception {
+        ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), true, s("/excluded"), s(), s(), s());
+        assertFalse(prefilter.excludes(newChangeSet(5, s("/a", "/b"), s("a", "b"), s(), s())));
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/excluded/foo", "/excluded/bar"), s("foo", "bar"), s(), s())));
+        
+        prefilter = new ChangeSetFilterImpl(s("/included"), true, s("/excluded"), s(), s(), s());
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/a", "/b"), s(), s(), s())));
+        assertFalse(prefilter.excludes(newChangeSet(5, s("/included/a", "/included/b"), s(), s(), s())));
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/excluded/foo", "/excluded/bar"), s(), s(), s())));
+
+        prefilter = new ChangeSetFilterImpl(s("/foo/**/included/**"), true /*ignored for globs */, s("/excluded"), s(), s(), s());
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/a", "/b"), s(), s(), s())));
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/included/a", "/included/b"), s(), s(), s())));
+        assertFalse(prefilter.excludes(newChangeSet(5, s("/foo/included/a"), s(), s(), s())));
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/included/b"), s(), s(), s())));
+        assertFalse(prefilter.excludes(newChangeSet(5, s("/foo/bar/included/a", "/included/b"), s(), s(), s())));
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/excluded/foo", "/excluded/bar"), s(), s(), s())));
+
+        prefilter = new ChangeSetFilterImpl(s("/main/**/included"), true, s("/main/excluded"), s(), s(), s());
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/main", "/main/foo"), s(), s(), s())));
+        assertFalse(prefilter.excludes(newChangeSet(5, s("/main/included", "/main/excluded"), s(), s(), s())));
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded/included", "/main/excluded"), s(), s(), s())));
+
+        prefilter = new ChangeSetFilterImpl(s("/main/included/**"), true, s("/main/excluded"), s(), s(), s());
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/main", "/main/foo"), s(), s(), s())));
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded"), s(), s(), s())));
+        assertFalse(prefilter.excludes(newChangeSet(5, s("/main/included"), s(), s(), s())));
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded/included", "/main/excluded"), s(), s(), s())));
+
+        prefilter = new ChangeSetFilterImpl(s("/main/inc-*/**"), true, s("/main/excluded"), s(), s(), s());
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/main", "/main/foo"), s(), s(), s())));
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded"), s(), s(), s())));
+        assertFalse(prefilter.excludes(newChangeSet(5, s("/main/inc-luded"), s(), s(), s())));
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded/included", "/main/excluded"), s(), s(), s())));
+    }
+    
+    @Test
+    public void testParentNodeNames() throws Exception {
+        ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), true, s(), s("foo", "bar"), s(), s());
+        assertFalse(prefilter.excludes(newChangeSet(5, s("/a/foo", "/b"), s("foo", "b"), s(), s())));
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/a/zoo", "/b"), s("zoo", "b"), s(), s())));
+        assertFalse(prefilter.excludes(newChangeSet(5, s("/a/zoo", "/bar"), s("zoo", "bar"), s(), s())));
+    }
+
+    @Test
+    public void testParentNodeTypes() throws Exception {
+        ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), true, s(), s(), s("nt:folder"), s());
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/a"), s("a"), s("nt:unstructured"), s())));
+        assertFalse(prefilter.excludes(newChangeSet(5, s("/a"), s("a"), s("nt:folder"), s())));
+    }
+
+    @Test
+    public void testPropertyNames() throws Exception {
+        ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), true, s(), s(), s(), s("jcr:data"));
+        assertTrue(prefilter.excludes(newChangeSet(5, s("/a"), s("a"), s(), s("myProperty"))));
+        assertFalse(prefilter.excludes(newChangeSet(5, s("/a"), s("a"), s(), s("jcr:data"))));
+    }
+
+}

Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java	(revision 1768046)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java	(working copy)
@@ -20,34 +20,50 @@
 package org.apache.jackrabbit.oak.spi.commit;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.io.Closeable;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import com.google.common.collect.Lists;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.observation.Filter;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.After;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
+import junit.framework.AssertionFailedError;
+
 public class BackgroundObserverTest {
     private static final CommitInfo COMMIT_INFO = new CommitInfo("no-session", null);
     public static final int CHANGE_COUNT = 1024;
 
     private final List<Runnable> assertions = Lists.newArrayList();
     private CountDownLatch doneCounter;
+    private final List<Closeable> closeables = Lists.newArrayList();
 
     /**
      * Assert that each observer of many running concurrently sees the same
-     * linearly sequence of commits (i.e. sees the commits in the correct order).
+     * linearly sequence of commits (i.e. sees the commits in the correct
+     * order).
      */
     @Test
     public void concurrentObservers() throws InterruptedException {
@@ -99,7 +115,7 @@
         return new BackgroundObserver(new Observer() {
             // Need synchronised list here to maintain correct memory barrier
             // when this is passed on to done(List<Runnable>)
-            final List<Runnable> assertions = Collections.synchronizedList(Lists.<Runnable>newArrayList());
+            final List<Runnable> assertions = Collections.synchronizedList(Lists.<Runnable> newArrayList());
             volatile NodeState previous;
 
             @Override
@@ -125,5 +141,288 @@
             }
         }, executor, queueLength);
     }
+    
+    class MyFilter implements Filter {
 
+        private boolean excludeNext;
+
+        void excludeNext(boolean excludeNext) {
+            this.excludeNext = excludeNext;
+        }
+
+        @Override
+        public boolean excludes(NodeState root, CommitInfo info) {
+            final boolean excludes = excludeNext;
+            excludeNext = false;
+            return excludes;
+        }
+        
+    }
+
+    class Recorder implements FilteringAwareObserver {
+
+        List<Pair> includedChanges = new LinkedList<Pair>();
+        private boolean pause;
+        private boolean pausing;
+
+        public Recorder() {
+        }
+        
+        @Override
+        public void contentChanged(NodeState before, NodeState after, CommitInfo info) {
+            includedChanges.add(new Pair(before, after));
+            maybePause();
+        }
+
+        public void maybePause() {
+            synchronized (this) {
+                try {
+                    while (pause) {
+                        pausing = true;
+                        this.notifyAll();
+                        try {
+                            this.wait();
+                        } catch (InterruptedException e) {
+                            // should not happen
+                        }
+                    }
+                } finally {
+                    pausing = false;
+                    this.notifyAll();
+                }
+            }
+        }
+
+        public synchronized void pause() {
+            this.pause = true;
+        }
+
+        public synchronized void unpause() {
+            this.pause = false;
+            this.notifyAll();
+        }
+
+        public boolean waitForPausing(int timeout, TimeUnit unit) throws InterruptedException {
+            final long done = System.currentTimeMillis() + unit.toMillis(timeout);
+            synchronized (this) {
+                while (!pausing && done > System.currentTimeMillis()) {
+                    this.wait();
+                }
+                return pausing;
+            }
+        }
+
+        public boolean waitForUnpausing(int timeout, TimeUnit unit) throws InterruptedException {
+            final long done = System.currentTimeMillis() + unit.toMillis(timeout);
+            synchronized (this) {
+                while (pausing && done > System.currentTimeMillis()) {
+                    this.wait();
+                }
+                return !pausing;
+            }
+        }
+
+    }
+
+    class Pair {
+        private final NodeState before;
+        private final NodeState after;
+
+        Pair(NodeState before, NodeState after) {
+            this.before = before;
+            this.after = after;
+        }
+
+        @Override
+        public String toString() {
+            return "Pair(before=" + before + ", after=" + after + ")";
+        }
+    }
+
+    class NodeStateGenerator {
+        Random r = new Random(1232131); // seed: repeatable tests
+        NodeBuilder builder = EMPTY_NODE.builder();
+
+        NodeState next() {
+            builder.setProperty("p", r.nextInt());
+            NodeState result = builder.getNodeState();
+            builder = result.builder();
+            return result;
+        }
+    }
+
+    private void assertMatches(String msg, List<Pair> expected, List<Pair> actual) {
+        assertEquals("size mismatch. msg=" + msg, expected.size(), actual.size());
+        for (int i = 0; i < expected.size(); i++) {
+            assertSame("mismatch of before at pos=" + i + ", msg=" + msg, expected.get(i).before, actual.get(i).before);
+            assertSame("mismatch of after at pos=" + i + ", msg=" + msg, expected.get(i).after, actual.get(i).after);
+        }
+    }
+
+    @After
+    public void shutDown() throws Exception {
+        for (Closeable closeable : closeables) {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                throw new AssertionFailedError(e.getMessage());
+            }
+        }
+    }
+
+    @Test
+    public void testExcludedAllCommits() throws Exception {
+        MyFilter filter = new MyFilter();
+        Recorder recorder = new Recorder();
+        ExecutorService executor = newSingleThreadExecutor();
+        FilteringObserver fo = new FilteringObserver(executor, 5, filter, recorder);
+        closeables.add(fo);
+        List<Pair> expected = new LinkedList<Pair>();
+        NodeStateGenerator generator = new NodeStateGenerator();
+        NodeState first = generator.next();
+        expected.add(new Pair(null, first));
+        fo.contentChanged(first, CommitInfo.EMPTY);
+        for (int i = 0; i < 100000; i++) {
+            filter.excludeNext(true);
+            fo.contentChanged(generator.next(), CommitInfo.EMPTY);
+        }
+        assertTrue("testExcludedAllCommits", fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
+        assertMatches("testExcludedAllCommits", expected, recorder.includedChanges);
+    }
+
+    @Test
+    public void testNoExcludedCommits() throws Exception {
+        MyFilter filter = new MyFilter();
+        Recorder recorder = new Recorder();
+        ExecutorService executor = newSingleThreadExecutor();
+        FilteringObserver fo = new FilteringObserver(executor, 10002, filter, recorder);
+        closeables.add(fo);
+        List<Pair> expected = new LinkedList<Pair>();
+        NodeStateGenerator generator = new NodeStateGenerator();
+        NodeState first = generator.next();
+        expected.add(new Pair(null, first));
+        fo.contentChanged(first, CommitInfo.EMPTY);
+        NodeState previous = first;
+        for (int i = 0; i < 10000; i++) {
+            filter.excludeNext(false);
+            NodeState next = generator.next();
+            expected.add(new Pair(previous, next));
+            previous = next;
+            fo.contentChanged(next, CommitInfo.EMPTY);
+        }
+        assertTrue("testNoExcludedCommits", fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
+        assertMatches("testNoExcludedCommits", expected, recorder.includedChanges);
+    }
+
+    @Test
+    public void testExcludeCommitsWithFullQueue() throws Exception {
+        MyFilter filter = new MyFilter();
+        Recorder recorder = new Recorder();
+        ExecutorService executor = newSingleThreadExecutor();
+        FilteringObserver fo = new FilteringObserver(executor, 2, filter, recorder);
+        closeables.add(fo);
+        List<Pair> expected = new LinkedList<Pair>();
+        NodeStateGenerator generator = new NodeStateGenerator();
+        recorder.pause();
+
+        // the first one will directly go to the recorder
+        NodeState firstIncluded = generator.next();
+        expected.add(new Pair(null, firstIncluded));
+        fo.contentChanged(firstIncluded, CommitInfo.EMPTY);
+
+        assertTrue("observer did not get called (yet?)", recorder.waitForPausing(5, TimeUnit.SECONDS));
+
+        // this one will be queued as #1
+        NodeState secondIncluded = generator.next();
+        expected.add(new Pair(firstIncluded, secondIncluded));
+        fo.contentChanged(secondIncluded, CommitInfo.EMPTY);
+
+        // this one will be queued as #2
+        NodeState thirdIncluded = generator.next();
+        expected.add(new Pair(secondIncluded, thirdIncluded));
+        fo.contentChanged(thirdIncluded, CommitInfo.EMPTY);
+
+        // this one will cause the queue to 'overflow' (full==true)
+        NodeState forthQueueFull = generator.next();
+        // not adding to expected, as this one ends up in the overflow element
+        fo.contentChanged(forthQueueFull, CommitInfo.EMPTY);
+
+        NodeState next;
+        // exclude when queue is full
+        filter.excludeNext(true);
+        next = generator.next();
+        // if excluded==true and full, hence not adding to expected
+        fo.contentChanged(next, CommitInfo.EMPTY);
+        // include after an exclude when queue was full
+        // => this is not supported. when the queue
+        filter.excludeNext(false);
+        next = generator.next();
+        // excluded==false BUT queue full, hence not adding to expected
+        fo.contentChanged(next, CommitInfo.EMPTY);
+        // let recorder continue
+        recorder.unpause();
+
+        recorder.waitForUnpausing(5, TimeUnit.SECONDS);
+        Thread.sleep(1000); // wait for 1 element to be dequeued at least
+        // exclude when queue is no longer full
+        filter.excludeNext(true);
+        NodeState seventhAfterQueueFull = generator.next();
+        // with the introduction of the FilteringAwareObserver this
+        // 'seventhAfterQueueFull' root will not be forwarded
+        // to the BackgroundObserver - thus entirely filtered
+
+        fo.contentChanged(seventhAfterQueueFull, CommitInfo.EMPTY);
+
+        // but with the introduction of FilteringAwareObserver the delivery
+        // only happens with non-filtered items, so adding yet another one now
+        filter.excludeNext(false);
+        NodeState last = generator.next();
+        // while above the "seventhAfterQueueFull" DOES get filtered, the next contentChange
+        // triggers the release of the 'queue full overflow element' (with commitInfo==null)
+        // and that we must add as expected()
+        expected.add(new Pair(thirdIncluded, seventhAfterQueueFull)); // commitInfo == null
+        expected.add(new Pair(seventhAfterQueueFull, last));
+        fo.contentChanged(last, CommitInfo.EMPTY);
+        
+        assertTrue("testExcludeCommitsWithFullQueue", fo.getBackgroundObserver().waitUntilStopped(10, TimeUnit.SECONDS));
+        assertMatches("testExcludeCommitsWithFullQueue", expected, recorder.includedChanges);
+    }
+
+    @Test
+    public void testExcludeSomeCommits() throws Exception {
+        ExecutorService executor = newSingleThreadExecutor();
+        for (int i = 0; i < 100; i++) {
+            doTestExcludeSomeCommits(i, executor);
+        }
+        for (int i = 100; i < 10000; i += 50) {
+            doTestExcludeSomeCommits(i, executor);
+        }
+    }
+
+    private void doTestExcludeSomeCommits(int cnt, Executor executor) throws Exception {
+        MyFilter filter = new MyFilter();
+        Recorder recorder = new Recorder();
+        FilteringObserver fo = new FilteringObserver(executor, cnt + 2, filter, recorder);
+        closeables.add(fo);
+        List<Pair> expected = new LinkedList<Pair>();
+        Random r = new Random(2343242); // seed: repeatable tests
+        NodeStateGenerator generator = new NodeStateGenerator();
+        NodeState first = generator.next();
+        expected.add(new Pair(null, first));
+        fo.contentChanged(first, CommitInfo.EMPTY);
+        NodeState previous = first;
+        for (int i = 0; i < cnt; i++) {
+            boolean excludeNext = r.nextInt(100) < 90;
+            filter.excludeNext(excludeNext);
+            NodeState next = generator.next();
+            if (!excludeNext) {
+                expected.add(new Pair(previous, next));
+            }
+            previous = next;
+            fo.contentChanged(next, CommitInfo.EMPTY);
+        }
+        assertTrue("cnt=" + cnt, fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
+        assertMatches("cnt=" + cnt, expected, recorder.includedChanges);
+    }
+
 }
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java	(revision 0)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java	(working copy)
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.plugins.observation.Filter;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.Test;
+
+public class PrefilteringBackgroundObserverTest {
+    
+    private final boolean EXCLUDED = true;
+    private final boolean INCLUDED = false;
+    
+    private List<Runnable> runnableQ;
+    private ExecutorService executor;
+    private CompositeObserver compositeObserver;
+    private List<ContentChanged> received;
+    private FilteringObserver filteringObserver;
+    private CommitInfo includingCommitInfo = new CommitInfo("includingSession", CommitInfo.OAK_UNKNOWN);
+    private CommitInfo excludingCommitInfo = new CommitInfo("excludingSession", CommitInfo.OAK_UNKNOWN);
+    private int resetCallCnt;
+    
+    public void init(int queueLength) throws Exception {
+        runnableQ = new LinkedList<Runnable>();
+        executor = new EnqueuingExecutorService(runnableQ);
+        compositeObserver = new CompositeObserver();
+        received = new LinkedList<ContentChanged>();
+        filteringObserver = new FilteringObserver(executor, queueLength, new Filter() {
+            
+            @Override
+            public boolean excludes(NodeState root, CommitInfo info) {
+                if (info == includingCommitInfo) {
+                    return false;
+                } else if (info == excludingCommitInfo) {
+                    return true;
+                } else if (info == null) {
+                    return false;
+                }
+                throw new IllegalStateException("only supporting include or exclude");
+            }
+        }, new FilteringAwareObserver() {
+            
+            NodeState previous;
+            
+            @Override
+            public void contentChanged(NodeState before, NodeState after, CommitInfo info) {
+                received.add(new ContentChanged(after, info));
+                if (previous !=null && previous != before) {
+                    resetCallCnt++;
+                }
+                previous = after;
+            }
+        });
+        compositeObserver.addObserver(filteringObserver);
+    }
+
+    private final class EnqueuingExecutorService extends AbstractExecutorService {
+        private final List<Runnable> runnableQ;
+
+        private EnqueuingExecutorService(List<Runnable> runnableQ) {
+            this.runnableQ = runnableQ;
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            runnableQ.add(command);
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            throw new IllegalStateException("nyi");
+        }
+
+        @Override
+        public void shutdown() {
+            throw new IllegalStateException("nyi");
+        }
+
+        @Override
+        public boolean isTerminated() {
+            throw new IllegalStateException("nyi");
+        }
+
+        @Override
+        public boolean isShutdown() {
+            throw new IllegalStateException("nyi");
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            throw new IllegalStateException("nyi");
+        }
+    }
+
+    class ContentChanged {
+        NodeState root;
+        CommitInfo info;
+        ContentChanged(NodeState root, CommitInfo info) {
+            this.root = root;
+            this.info = info;
+        }
+    }
+    
+    private static void executeRunnables(final List<Runnable> runnableQ, int num) {
+        for(int i=0; i<num; i++) {
+            for (Runnable runnable : new ArrayList<Runnable>(runnableQ)) {
+                runnable.run();
+            }
+        }
+    }
+
+    private static NodeState p(int k) {
+        return EMPTY_NODE.builder().setProperty("p", k).getNodeState();
+    }
+
+    @Test
+    public void testFlipping() throws Exception {
+        final int queueLength = 2000;
+        init(queueLength);
+
+        // initialize observer with an initial contentChanged
+        // (see ChangeDispatcher#addObserver)
+        {
+            compositeObserver.contentChanged(p(-1), null);
+        }
+        // Part 1 : first run with filtersEvaluatedMapWithEmptyObservers - empty or null shouldn't matter, it's excluded in both cases
+        for (int k = 0; k < 1000; k++) {
+            CommitInfo info;
+            if (k%2==1) {
+                info = includingCommitInfo;
+            } else {
+                info = excludingCommitInfo;
+            }
+            final NodeState p = p(k);
+            compositeObserver.contentChanged(p, info);
+            if (k%10 == 0) {
+                executeRunnables(runnableQ, 10);
+            }
+        }
+        executeRunnables(runnableQ, 10);
+        
+        assertEquals(501, received.size());
+        assertEquals(500, resetCallCnt);
+        
+        // Part 2 : run with filtersEvaluatedMapWithNullObservers - empty or null shouldn't matter, it's excluded in both cases
+        received.clear();
+        resetCallCnt = 0;
+        for (int k = 0; k < 1000; k++) {
+            CommitInfo info;
+            if (k%2==1) {
+                info = includingCommitInfo;
+            } else {
+                info = excludingCommitInfo;
+            }
+            final NodeState p = p(k);
+            compositeObserver.contentChanged(p, info);
+            if (k%10 == 0) {
+                executeRunnables(runnableQ, 10);
+            }
+        }
+        executeRunnables(runnableQ, 10);
+        
+        assertEquals(500, received.size());
+        assertEquals(500, resetCallCnt);
+        
+        // Part 3 : unlike the method name suggests, this variant tests with the filter disabled, so should receive all events normally
+        received.clear();
+        resetCallCnt = 0;
+        for (int k = 0; k < 1000; k++) {
+            CommitInfo info;
+            if (k%2==1) {
+                info = includingCommitInfo;
+            } else {
+                info = includingCommitInfo;
+            }
+            final NodeState p = p(k);
+            compositeObserver.contentChanged(p, info);
+            if (k%10 == 0) {
+                executeRunnables(runnableQ, 10);
+            }
+        }
+        executeRunnables(runnableQ, 10);
+        
+        assertEquals(1000, received.size());
+        assertEquals(0, resetCallCnt);
+    }
+
+    @Test
+    public void testFlipping2() throws Exception {
+        doTestFullQueue(6, 
+                new TestPattern(INCLUDED, 1, true, 1, 0),
+                new TestPattern(EXCLUDED, 5, true, 0, 0),
+                new TestPattern(INCLUDED, 2, true, 2, 1),
+                new TestPattern(EXCLUDED, 1, true, 0, 0),
+                new TestPattern(INCLUDED, 2, true, 2, 1));
+    }
+    
+    @Test
+    public void testQueueNotFull() throws Exception {
+        doTestFullQueue(20, 
+                // start: empty queue
+                new TestPattern(EXCLUDED, 1000, false, 0, 0),
+                // here: still empty, just the previousRoot is set to remember above NOOPs
+                new TestPattern(INCLUDED, 5, false, 0, 0),
+                // here: 5 changes are in the queue, the queue fits 20, way to go
+                new TestPattern(EXCLUDED, 500, false, 0, 0),
+                // still 5 in the queue
+                new TestPattern(INCLUDED, 5, false, 0, 0),
+                // now we added 2, queue still not full
+                new TestPattern(EXCLUDED, 0 /* only flush*/, true, 10, 2)
+                );
+    }
+    
+    @Test
+    public void testIncludeOnQueueFull() throws Exception {
+        doTestFullQueue(7, 
+                // start: empty queue
+                new TestPattern(EXCLUDED, 1000, false, 0, 0, 0, 0),
+                // here: still empty, just the previousRoot is set to remember above NOOPs
+                new TestPattern(INCLUDED, 5, false, 0, 0, 0, 6),
+                // here: 1 init and 5 changes are in the queue, the queue fits 7, so queue is almost full
+                new TestPattern(EXCLUDED, 500, false, 0, 0, 6, 6),
+                // still 6 in the queue, of 7
+                new TestPattern(INCLUDED, 5, false, 0, 0, 6, 7),
+                // now we added 2 (one NOOP and one of those 5), so the queue got full (==7)
+                new TestPattern(EXCLUDED, 0 /* only flush*/, true, 5, 1, 7, 0)
+                );
+    }
+    
+    @Test
+    public void testExcludeOnQueueFull2() throws Exception {
+        doTestFullQueue(1,
+                // start: empty queue
+                new TestPattern(INCLUDED, 10, false, 0, 0),
+                new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0),
+                new TestPattern(INCLUDED, 10, false, 0, 0),
+                new TestPattern(EXCLUDED, 10, false, 0, 0),
+                new TestPattern(INCLUDED, 10, false, 0, 0),
+                new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0),
+                new TestPattern(INCLUDED, 10, false, 0, 0),
+                new TestPattern(EXCLUDED, 10, false, 0, 0),
+                new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0),
+                new TestPattern(EXCLUDED, 10, false, 0, 0),
+                new TestPattern(INCLUDED, 10, false, 0, 0),
+                new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0));
+    }
+
+    @Test
+    public void testExcludeOnQueueFull1() throws Exception {
+        doTestFullQueue(4, 
+                // start: empty queue
+                new TestPattern(EXCLUDED, 1, false, 0, 0, 0, 0),
+                // here: still empty, just the previousRoot is set to remember above NOOP
+                new TestPattern(INCLUDED, 3, false, 0, 0, 0, 4),
+                // here: 3 changes are in the queue, the queue fits 3, so it just got full now
+                new TestPattern(EXCLUDED, 1, false, 0, 0, 4, 4),
+                // still full but it's ignored, so doesn't have any queue length effect
+                new TestPattern(INCLUDED, 3, false, 0, 0, 4, 4),
+                // adding 3 will not work, it will result in an overflow entry
+                new TestPattern(EXCLUDED, 0 /* only flush*/, true, 3, 1, 4, 0),
+                new TestPattern(INCLUDED, 1, false, 0, 0, 0, 1),
+                new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0, 1, 0)
+                );
+    }
+
+    class TestPattern {
+        final boolean flush;
+        final boolean excluded;
+        final int numEvents;
+        final int expectedNumEvents;
+        final int expectedNumResetCalls;
+        private int expectedQueueSizeAtStart = -1;
+        private int expectedQueueSizeAtEnd = -1;
+        TestPattern(boolean excluded, int numEvents, boolean flush, int expectedNumEvents, int expectedNumResetCalls) {
+            this.flush = flush;
+            this.excluded = excluded;
+            this.numEvents = numEvents;
+            this.expectedNumEvents = expectedNumEvents;
+            this.expectedNumResetCalls = expectedNumResetCalls;
+        }
+        TestPattern(boolean excluded, int numEvents, boolean flush, int expectedNumEvents, int expectedNumResetCalls, int expectedQueueSizeAtStart, int expectedQueueSizeAtEnd) {
+            this(excluded, numEvents, flush, expectedNumEvents, expectedNumResetCalls);
+            this.expectedQueueSizeAtStart = expectedQueueSizeAtStart;
+            this.expectedQueueSizeAtEnd = expectedQueueSizeAtEnd;
+        }
+        
+        @Override
+        public String toString() {
+            return "excluded="+excluded+", numEvents="+numEvents+", flush="+flush+", expectedNumEvents="+expectedNumEvents+", expectedNumResetCalls="+expectedNumResetCalls;
+        }
+    }
+    
+    private void doTestFullQueue(int queueLength, TestPattern... testPatterns) throws Exception {
+        init(queueLength);
+
+        // initialize observer with an initial contentChanged
+        // (see ChangeDispatcher#addObserver)
+        {
+            compositeObserver.contentChanged(p(-1), null);
+        }
+        // remove above first event right away
+        executeRunnables(runnableQ, 5);
+        received.clear();
+        resetCallCnt = 0;
+        
+        int k = 0;
+        int loopCnt = 0;
+        for (TestPattern testPattern : testPatterns) {
+            k++;
+            if (testPattern.expectedQueueSizeAtStart >= 0) {
+                assertEquals("loopCnt="+loopCnt+", queue size mis-match at start", 
+                        testPattern.expectedQueueSizeAtStart, filteringObserver.getBackgroundObserver().getMBean().getQueueSize());
+            }
+            for(int i=0; i<testPattern.numEvents; i++) {
+                CommitInfo info;
+                if (!testPattern.excluded) {
+                    info = includingCommitInfo;
+                } else {
+                    info = excludingCommitInfo;
+                }
+                k++;
+                compositeObserver.contentChanged(p(k), info);
+            }
+            if (testPattern.flush) {
+                executeRunnables(runnableQ, testPattern.numEvents + testPattern.expectedNumEvents + testPattern.expectedNumResetCalls + 10);
+            }
+            assertEquals("loopCnt="+loopCnt, testPattern.expectedNumEvents, received.size());
+            assertEquals("loopCnt="+loopCnt, testPattern.expectedNumResetCalls, resetCallCnt);
+            received.clear();
+            resetCallCnt = 0;
+            loopCnt++;
+            if (testPattern.expectedQueueSizeAtEnd >= 0) {
+                assertEquals("loopCnt="+loopCnt+", queue size mis-match at end", 
+                        testPattern.expectedQueueSizeAtEnd, filteringObserver.getBackgroundObserver().getMBean().getQueueSize());
+            }
+        }
+    }
+
+}

Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java	(revision 1768046)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java	(working copy)
@@ -45,6 +45,7 @@
 import org.apache.jackrabbit.oak.plugins.name.NamespaceEditorProvider;
 import org.apache.jackrabbit.oak.plugins.nodetype.TypeEditorProvider;
 import org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
 import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.plugins.version.VersionHook;
 import org.apache.jackrabbit.oak.query.QueryEngineSettings;
@@ -120,6 +121,7 @@
             with(new NamespaceEditorProvider());
             with(new TypeEditorProvider());
             with(new ConflictValidatorProvider());
+            with(new ChangeCollectorProvider());
 
             with(new ReferenceEditorProvider());
             with(new ReferenceIndexProvider());
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java	(revision 1768046)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java	(working copy)
@@ -21,6 +21,7 @@
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_COUNTER;
 import static org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_DURATION;
+import static org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider.COMMIT_CONTEXT_OBSERVATION_CHANGESET;
 import static org.apache.jackrabbit.oak.plugins.observation.filter.VisibleFilter.VISIBLE_FILTER;
 import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
 import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerObserver;
@@ -32,40 +33,46 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import javax.jcr.observation.Event;
 import javax.jcr.observation.EventIterator;
 import javax.jcr.observation.EventListener;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Monitor;
-import com.google.common.util.concurrent.Monitor.Guard;
 import org.apache.jackrabbit.api.jmx.EventListenerMBean;
 import org.apache.jackrabbit.commons.observation.ListenerTracker;
 import org.apache.jackrabbit.oak.api.ContentSession;
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
 import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
+import org.apache.jackrabbit.oak.plugins.observation.Filter;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringDispatcher;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
 import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterConfigMBean;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
 import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
+import org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilter;
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
-import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
+import org.apache.jackrabbit.oak.stats.MeterStats;
 import org.apache.jackrabbit.oak.stats.StatisticManager;
-import org.apache.jackrabbit.oak.stats.MeterStats;
 import org.apache.jackrabbit.oak.stats.TimerStats;
 import org.apache.jackrabbit.oak.util.PerfLogger;
 import org.apache.jackrabbit.stats.TimeSeriesMax;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Monitor;
+import com.google.common.util.concurrent.Monitor.Guard;
+
 /**
  * A {@code ChangeProcessor} generates observation {@link javax.jcr.observation.Event}s
  * based on a {@link FilterProvider filter} and delivers them to an {@link EventListener}.
@@ -73,11 +80,27 @@
  * After instantiation a {@code ChangeProcessor} must be started in order to start
  * delivering observation events and stopped to stop doing so.
  */
-class ChangeProcessor implements Observer {
+class ChangeProcessor implements FilteringAwareObserver {
     private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
     private static final PerfLogger PERF_LOGGER = new PerfLogger(
             LoggerFactory.getLogger(ChangeProcessor.class.getName() + ".perf"));
 
+    private static enum FilterResult {
+        /** marks a commit as to be included, ie delivered.
+         * It's okay to falsely mark a commit as included,
+         * since filtering (as part of converting to events)
+         * will be applied at a later stage again. */
+        INCLUDE,
+        /** mark a commit as not of interest to this ChangeProcessor.
+         * Exclusion is definite, ie it's not okay to falsely
+         * mark a commit as excluded */
+        EXCLUDE, 
+        /** mark a commit as included but indicate that this
+         * is not a result of prefiltering but that prefiltering
+         * was skipped/not applicable for some reason */
+        PREFILTERING_SKIPPED
+    }
+    
     /**
      * Fill ratio of the revision queue at which commits should be delayed
      * (conditional of {@code commitRateLimiter} being non {@code null}).
@@ -89,7 +112,12 @@
      * kicks in.
      */
     public static final int MAX_DELAY;
-
+    
+    /** The test mode can be used to just verify if prefiltering would have
+     * correctly done its job and warn if that's not the case.
+     */
+    private static final boolean PREFILTERING_TESTMODE;
+    
     // OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using System.properties for now
     static {
         final String delayThresholdStr = System.getProperty("oak.commitRateLimiter.delayThreshold");
@@ -114,6 +142,18 @@
         }
         DELAY_THRESHOLD = delayThreshold;
         MAX_DELAY = maxDelay;
+
+        final String prefilteringTestModeStr = System.getProperty("oak.observation.prefilteringTestMode");
+        boolean prefilteringTestModeBool = false; // default is enabled
+        try {
+            if (prefilteringTestModeStr != null && prefilteringTestModeStr.length() != 0) {
+                prefilteringTestModeBool = Boolean.parseBoolean(prefilteringTestModeStr);
+                LOG.info("<clinit> using oak.observation.prefilteringTestMode = " + prefilteringTestModeBool);
+            }
+        } catch(RuntimeException e) {
+            LOG.warn("<clinit> could not parse oak.observation.prefilteringTestMode, using default (" + prefilteringTestModeBool + "): " + e, e);
+        }
+        PREFILTERING_TESTMODE = prefilteringTestModeBool;
     }
     
     private static final AtomicInteger COUNTER = new AtomicInteger();
@@ -145,8 +185,22 @@
      */
     private CompositeRegistration registration;
 
-    private volatile NodeState previousRoot;
-
+    /**
+     * for statistics: tracks how many times prefiltering excluded a commit
+     */
+    private int prefilterExcludeCount;
+    
+    /**
+     * for statistics: tracks how many times prefiltering included a commit
+     */
+    private int prefilterIncludeCount;
+    
+    /**
+     * for statistics: tracks how many times prefiltering was ignored (not evaluated at all),
+     * either because it was disabled, queue too small, CommitInfo null or CommitContext null
+     */
+    private int prefilterSkipCount;
+    
     public ChangeProcessor(
             ContentSession contentSession,
             NamePathMapper namePathMapper,
@@ -180,6 +234,29 @@
         return filterProvider.get();
     }
 
+    @Nonnull
+    public ChangeProcessorMBean getMBean() {
+        return new ChangeProcessorMBean() {
+
+            @Override
+            public int getPrefilterExcludeCount() {
+                return prefilterExcludeCount;
+            }
+
+            @Override
+            public int getPrefilterIncludeCount() {
+                return prefilterIncludeCount;
+            }
+
+            @Override
+            public int getPrefilterSkipCount() {
+                return prefilterSkipCount;
+            }
+
+        };
+    }
+
+    
     /**
      * Start this change processor
      * @param whiteboard  the whiteboard instance to used for scheduling individual
@@ -190,16 +267,18 @@
         checkState(registration == null, "Change processor started already");
         final WhiteboardExecutor executor = new WhiteboardExecutor();
         executor.start(whiteboard);
-        final BackgroundObserver observer = createObserver(executor);
+        final FilteringObserver filteringObserver = createObserver(executor);
         listenerId = COUNTER.incrementAndGet() + "";
         Map<String, String> attrs = ImmutableMap.of(LISTENER_ID, listenerId);
         String name = tracker.toString();
         registration = new CompositeRegistration(
-            registerObserver(whiteboard, observer),
+            registerObserver(whiteboard, filteringObserver),
             registerMBean(whiteboard, EventListenerMBean.class,
                     tracker.getListenerMBean(), "EventListener", name, attrs),
             registerMBean(whiteboard, BackgroundObserverMBean.class,
-                    observer.getMBean(), BackgroundObserverMBean.TYPE, name, attrs),
+                    filteringObserver.getBackgroundObserver().getMBean(), BackgroundObserverMBean.TYPE, name, attrs),
+            registerMBean(whiteboard, ChangeProcessorMBean.class,
+                    getMBean(), ChangeProcessorMBean.TYPE, name, attrs),
             //TODO If FilterProvider gets changed later then MBean would need to be
             // re-registered
             registerMBean(whiteboard, FilterConfigMBean.class,
@@ -207,7 +286,7 @@
             new Registration() {
                 @Override
                 public void unregister() {
-                    observer.close();
+                    filteringObserver.close();
                 }
             },
             new Registration() {
@@ -225,8 +304,9 @@
         );
     }
 
-    private BackgroundObserver createObserver(final WhiteboardExecutor executor) {
-        return new BackgroundObserver(this, executor, queueLength) {
+    private FilteringObserver createObserver(final WhiteboardExecutor executor) {
+        FilteringDispatcher fd = new FilteringDispatcher(this);
+        BackgroundObserver bo = new BackgroundObserver(fd, executor, queueLength) {
             private volatile long delay;
             private volatile boolean blocking;
 
@@ -287,7 +367,43 @@
                 }
             }
 
+            
+            @Override
+            public String toString() {
+                return "Prefiltering BackgroundObserver for "+ChangeProcessor.this;
+            }
         };
+        return new FilteringObserver(bo, new Filter() {
+            
+            @Override
+            public boolean excludes(NodeState root, CommitInfo info) {
+                if (PREFILTERING_TESTMODE) {
+                    // then we don't prefilter but only test later
+                    prefilterSkipCount++;
+                    return false;
+                }
+                final FilterResult filterResult = evalPrefilter(root, info, getChangeSet(info));
+                switch (filterResult) {
+                case PREFILTERING_SKIPPED: {
+                    prefilterSkipCount++;
+                    return false;
+                }
+                case EXCLUDE: {
+                    prefilterExcludeCount++;
+                    return true;
+                }
+                case INCLUDE: {
+                    prefilterIncludeCount++;
+                    return false;
+                }
+                default: {
+                    LOG.info("isExcluded: unknown/unsupported filter result: " + filterResult);
+                    prefilterSkipCount++;
+                    return false;
+                }
+                }
+            }
+        });
     }
 
     private final Monitor runningMonitor = new Monitor();
@@ -339,16 +455,48 @@
         }
     }
 
+    /**
+     * Utility method that extracts the ChangeSet from a CommitInfo if possible.
+     * @param info
+     * @return
+     */
+    public static ChangeSet getChangeSet(CommitInfo info) {
+        if (info == null) {
+            return null;
+        }
+        CommitContext context = (CommitContext) info.getInfo().get(CommitContext.NAME);
+        if (context == null) {
+            return null;
+        }
+        return (ChangeSet) context.get(COMMIT_CONTEXT_OBSERVATION_CHANGESET);
+    }
+
     @Override
-    public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
-        if (previousRoot != null) {
+    public void contentChanged(NodeState before, NodeState after, CommitInfo info) {
+        FilterResult prefilterTestResult = null;
+        if (PREFILTERING_TESTMODE) {
+            // OAK-4908 test mode: when the ChangeCollectorProvider is enabled
+            // there is the option to have the ChangeProcessors run in so-called
+            // 'test mode'. In this test mode the prefiltering is not applied,
+            // but instead verified if it *would have prefiltered correctly*.
+            // that test is therefore done at dequeue-time, hence in
+            // contentChanged
+            // TODO: remove this testing mechanism after a while
             try {
+                prefilterTestResult = evalPrefilter(after, info, getChangeSet(info));
+            } catch (Exception e) {
+                LOG.warn("contentChanged: exception in wouldBeExcludedCommit: " + e, e);
+            }
+        }
+        if (before != null) {
+            try {
                 long start = PERF_LOGGER.start();
                 FilterProvider provider = filterProvider.get();
+                boolean onEventInvoked = false;
                 // FIXME don't rely on toString for session id
                 if (provider.includeCommit(contentSession.toString(), info)) {
-                    EventFilter filter = provider.getFilter(previousRoot, root);
-                    EventIterator events = new EventQueue(namePathMapper, info, previousRoot, root,
+                    EventFilter filter = provider.getFilter(before, after);
+                    EventIterator events = new EventQueue(namePathMapper, info, before, after,
                             provider.getSubTrees(), Filters.all(filter, VISIBLE_FILTER), 
                             provider.getEventAggregator());
 
@@ -361,6 +509,7 @@
                         }
                         try {
                             CountingIterator countingEvents = new CountingIterator(events);
+                            onEventInvoked = true;
                             eventListener.onEvent(countingEvents);
                             countingEvents.updateCounters(eventCount, eventDuration);
                         } finally {
@@ -371,14 +520,33 @@
                         }
                     }
                 }
+                if (prefilterTestResult != null) {
+                    // OAK-4908 test mode
+                    if (prefilterTestResult == FilterResult.EXCLUDE && onEventInvoked) {
+                        // this is not ok, an event would have gotten
+                        // excluded-by-prefiltering even though
+                        // it actually got an event.
+                        LOG.warn("contentChanged: delivering event which would have been prefiltered, "
+                                + "info={}, this={}, listener={}", info, this, eventListener);
+                    } else if (prefilterTestResult == FilterResult.INCLUDE && !onEventInvoked && info != null
+                            && info != CommitInfo.EMPTY) {
+                        // this can occur arbitrarily frequent. as prefiltering
+                        // is not perfect, it can
+                        // have false negatives - ie it can include even though
+                        // no event is then created
+                        // hence we can only really log at debug here
+                        LOG.debug(
+                                "contentChanged: no event to deliver but not prefiltered, info={}, this={}, listener={}",
+                                info, this, eventListener);
+                    }
+                }
                 PERF_LOGGER.end(start, 100,
                         "Generated events (before: {}, after: {})",
-                        previousRoot, root);
+                        before, after);
             } catch (Exception e) {
                 LOG.warn("Error while dispatching observation events for " + tracker, e);
             }
         }
-        previousRoot = root;
     }
 
     private static class CountingIterator implements EventIterator {
@@ -490,4 +658,45 @@
                 + ", commitRateLimiter=" + commitRateLimiter
                 + ", running=" + running.isSatisfied() + "]";
     }
+
+    /**
+     * Evaluate the prefilter for a given commit.
+     * @param changeSet 
+     * 
+     * @return a FilterResult indicating either inclusion, exclusion or
+     *         inclusion-due-to-skipping. The latter is used to reflect
+     *         prefilter evaluation better in statistics (as it could also have
+     *         been reported just as include)
+     */
+    private FilterResult evalPrefilter(NodeState root, CommitInfo info, ChangeSet changeSet) {
+        if (info == null) {
+            return FilterResult.PREFILTERING_SKIPPED;
+        }
+        if (root == null) {
+            // likely only occurs at startup
+            // we can't do any diffing etc, so just not exclude it
+            return FilterResult.PREFILTERING_SKIPPED;
+        }
+
+        final FilterProvider fp = filterProvider.get();
+        // FIXME don't rely on toString for session id
+        if (!fp.includeCommit(contentSession.toString(), info)) {
+            // 'classic' (and cheap pre-) filtering
+            return FilterResult.EXCLUDE;
+        }
+        if (changeSet == null) {
+            // then can't do any prefiltering since it was not
+            // able to complete the sets (within the given boundaries)
+            // (this corresponds to a large commit, which thus can't
+            // go through prefiltering)
+            return FilterResult.PREFILTERING_SKIPPED;
+        }
+
+        final ChangeSetFilter prefilter = fp;
+        if (prefilter.excludes(changeSet)) {
+            return FilterResult.EXCLUDE;
+        } else {
+            return FilterResult.INCLUDE;
+        }
+    }
 }
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java	(revision 0)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java	(working copy)
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.jcr.observation;
+
+public interface ChangeProcessorMBean {
+    String TYPE = "ChangeProcessorStats";
+
+    /** Returns the number of commits that were excluded by the prefiltering mechanism */
+    int getPrefilterExcludeCount();
+
+    /** Returns the number of commits that were included by the prefiltering mechanism */
+    int getPrefilterIncludeCount();
+    
+    /** Returns the number of commits that skipped prefiltering, thus got included */
+    int getPrefilterSkipCount();
+}

Property changes on: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java	(revision 1768046)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java	(working copy)
@@ -84,6 +84,12 @@
                 referenceInterface = BackgroundObserverMBean.class,
                 policy = ReferencePolicy.DYNAMIC,
                 cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
+        @Reference(name = "changeProcessorMBean",
+                bind = "bindChangeProcessorMBean",
+                unbind = "unbindChangeProcessorMBean",
+                referenceInterface = ChangeProcessorMBean.class,
+                policy = ReferencePolicy.DYNAMIC,
+                cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
         @Reference(name = "filterConfigMBean",
                 bind = "bindFilterConfigMBean",
                 unbind = "unbindFilterConfigMBean",
@@ -96,6 +102,7 @@
     private final AtomicInteger observerCount = new AtomicInteger();
     private final Map<ObjectName, EventListenerMBean> eventListeners = Maps.newConcurrentMap();
     private final Map<ObjectName, BackgroundObserverMBean> bgObservers = Maps.newConcurrentMap();
+    private final Map<ObjectName, ChangeProcessorMBean> changeProcessors = Maps.newConcurrentMap();
     private final Map<ObjectName, FilterConfigMBean> filterConfigs = Maps.newConcurrentMap();
 
     private Registration mbeanReg;
@@ -201,6 +208,11 @@
                     m.observerMBean = ef.getValue();
                 }
             }
+            for (Map.Entry<ObjectName, ChangeProcessorMBean> ef : changeProcessors.entrySet()){
+                if (Objects.equal(getListenerId(ef.getKey()), listenerId)){
+                    m.changeProcessorMBean = ef.getValue();
+                }
+            }
             mbeans.add(m);
         }
         return mbeans;
@@ -249,6 +261,16 @@
     }
 
     @SuppressWarnings("unused")
+    protected void bindChangeProcessorMBean(ChangeProcessorMBean mbean, Map<String, ?> config){
+    	changeProcessors.put(getObjectName(config), mbean);
+    }
+
+    @SuppressWarnings("unused")
+    protected void unbindChangeProcessorMBean(ChangeProcessorMBean mbean, Map<String, ?> config){
+    	changeProcessors.remove(getObjectName(config));
+    }
+
+    @SuppressWarnings("unused")
     protected void bindListenerMBean(EventListenerMBean mbean, Map<String, ?> config){
         eventListeners.put(getObjectName(config), mbean);
     }
@@ -280,6 +302,7 @@
     private static class ListenerMBeans {
         EventListenerMBean eventListenerMBean;
         BackgroundObserverMBean observerMBean;
+        ChangeProcessorMBean changeProcessorMBean;
         FilterConfigMBean filterConfigMBean;
     }
 
@@ -301,6 +324,9 @@
                 "ratioOfTimeSpentProcessingEvents",
                 "eventConsumerTimeRatio",
                 "queueBacklogMillis",
+                "prefilterSkips",
+                "prefilterExcludes",
+                "prefilterIncludes",
                 "queueSize",
                 "localEventCount",
                 "externalEventCount",
@@ -331,6 +357,9 @@
                 SimpleType.INTEGER,
                 SimpleType.INTEGER,
                 SimpleType.INTEGER,
+                SimpleType.INTEGER,
+                SimpleType.INTEGER,
+                SimpleType.INTEGER,
                 SimpleType.STRING,
                 SimpleType.BOOLEAN,
                 SimpleType.BOOLEAN,
@@ -376,6 +405,9 @@
                     mbeans.eventListenerMBean.getRatioOfTimeSpentProcessingEvents(),
                     mbeans.eventListenerMBean.getEventConsumerTimeRatio(),
                     mbeans.eventListenerMBean.getQueueBacklogMillis(),
+                    mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterSkipCount(),
+                    mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterExcludeCount(),
+                    mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterIncludeCount(),
                     mbeans.observerMBean.getQueueSize(),
                     mbeans.observerMBean.getLocalEventCount(),
                     mbeans.observerMBean.getExternalEventCount(),
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java	(revision 1768046)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java	(working copy)
@@ -512,4 +512,21 @@
         return this;
     }
 
+    /**
+     * A hook called by the ObservationManagerImpl before creating the ChangeSetFilterImpl
+     * which allows this filter to adjust the includePaths according to its
+     * enabled flags.
+     * <p>
+     * This is used to set the includePath to be '/' in case includeAncestorRemove
+     * is set. The reason for this is that we must catch parent removals and can thus
+     * not apply the normally applied prefilter paths.
+     * @param includePaths the set to adjust depending on filter flags
+     */
+    void adjustPrefilterIncludePaths(Set<String> includePaths) {
+        if (includeAncestorRemove) {
+            includePaths.clear();
+            includePaths.add("/");
+        }
+    }
+
 }
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java	(revision 1768046)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java	(working copy)
@@ -62,6 +62,7 @@
 import org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
 import org.apache.jackrabbit.oak.plugins.observation.filter.PermissionProviderFactory;
+import org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilterImpl;
 import org.apache.jackrabbit.oak.plugins.observation.filter.Selectors;
 import org.apache.jackrabbit.oak.spi.commit.Observable;
 import org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration;
@@ -278,6 +279,7 @@
 
         List<Condition> excludeConditions = createExclusions(filterBuilder, excludedPaths);
 
+        final String[] validatedNodeTypeNames = validateNodeTypeNames(nodeTypeName);
         Selector nodeTypeSelector = Selectors.PARENT;
         boolean deleteSubtree = true;
         if (oakEventFilter != null) {
@@ -304,7 +306,7 @@
                     filterBuilder.moveSubtree(),
                     filterBuilder.eventType(eventTypes),
                     filterBuilder.uuid(Selectors.PARENT, uuids),
-                    filterBuilder.nodeType(nodeTypeSelector, validateNodeTypeNames(nodeTypeName)),
+                    filterBuilder.nodeType(nodeTypeSelector, validatedNodeTypeNames),
                     filterBuilder.accessControl(permissionProviderFactory));
         if (oakEventFilter != null) {
             condition = oakEventFilter.wrapMainCondition(condition, filterBuilder, permissionProviderFactory);
@@ -319,6 +321,16 @@
         ListenerTracker tracker = new WarningListenerTracker(
                 !noExternal, listener, eventTypes, absPath, isDeep, uuids, nodeTypeName, noLocal);
 
+        if (oakEventFilter != null) {
+            oakEventFilter.adjustPrefilterIncludePaths(includePaths);
+        }
+        
+        // OAK-4908 : prefiltering support. here we have explicit yes/no/maybe filtering
+        // for things like propertyNames/nodeTypes/nodeNames/paths which cannot be 
+        // applied on the full-fledged filterBuilder above but requires an explicit 'prefilter' for that.
+        filterBuilder.setChangeSetFilter(new ChangeSetFilterImpl(includePaths, isDeep, excludedPaths, null,
+                validatedNodeTypeNames == null ? null : newHashSet(validatedNodeTypeNames), null));
+        
         addEventListener(listener, tracker, filterBuilder.build());
     }
 
Index: oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
===================================================================
--- oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java	(revision 1768046)
+++ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java	(working copy)
@@ -436,7 +436,25 @@
             observationManager.removeEventListener(listener);
         }
     }
+    
+    @Test
+    public void propertyFilter() throws Exception {
+        Node root = getNode("/");
+        ExpectationListener listener = new ExpectationListener();
+        observationManager.addEventListener(listener, PROPERTY_ADDED, "/a/b", false, null, null, false);
+        Node a = root.addNode("a");
+        Node b = a.addNode("b");
+        listener.expect("/a/b/jcr:primaryType", PROPERTY_ADDED);
 
+        listener.expectAdd(b.setProperty("propName", 1));
+    	root.getSession().save();
+
+    	List<Expectation> missing = listener.getMissing(TIME_OUT, TimeUnit.SECONDS);
+        assertTrue("Missing events: " + missing, missing.isEmpty());
+        List<Event> unexpected = listener.getUnexpected();
+        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+    }
+    
     @Test
     public void pathFilter() throws Exception {
         final String path = "/events/only/here";
@@ -1515,7 +1533,7 @@
 
         filter = new JackrabbitEventFilter();
         filter.setEventTypes(ALL_EVENTS);
-        filter = FilterFactory.wrap(filter).withIncludeGlobPaths(TEST_PATH + "/a3/**/y/*");
+        filter = FilterFactory.wrap(filter).withIncludeGlobPaths(TEST_PATH + "/a3/**/y");
         oManager.addEventListener(listener, filter);
         cp = oManager.getChangeProcessor(listener);
         assertNotNull(cp);
