Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java	(revision 1763448)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java	(working copy)
@@ -42,6 +42,7 @@
 import com.google.common.collect.Iterables;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.plugins.nodetype.TypePredicate;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
 import org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector;
 import org.apache.jackrabbit.oak.plugins.tree.RootFactory;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -59,11 +60,24 @@
     private boolean includeClusterLocal = true;
     private final List<String> subTrees = newArrayList();
     private Condition condition = includeAll();
+    private Prefilter prefilter = new Prefilter() {
+        
+        @Override
+        public boolean excludeCommit(ChangeSet changeSet) {
+            return false;
+        }
+    };
 
     public interface Condition {
         @Nonnull
         EventFilter createFilter(@Nonnull NodeState before, @Nonnull NodeState after);
     }
+    
+    @Nonnull
+    public FilterBuilder setPrefilter(@Nonnull Prefilter prefilter) {
+        this.prefilter = prefilter;
+        return this;
+    }
 
     /**
      * Adds a path to the set of paths whose subtrees include all events of
@@ -372,6 +386,7 @@
             final boolean includeClusterLocal = FilterBuilder.this.includeClusterLocal;
             final Iterable<String> subTrees = FilterBuilder.this.getSubTrees();
             final Condition condition = FilterBuilder.this.condition;
+            final Prefilter prefilter = FilterBuilder.this.prefilter;
 
             @Override
             public boolean includeCommit(@Nonnull String sessionId, @CheckForNull CommitInfo info) {
@@ -404,6 +419,11 @@
             private boolean isExternal(CommitInfo info) {
                 return info == null;
             }
+            
+            @Override
+            public boolean excludeCommit(ChangeSet prefilterChangeSet) {
+                return prefilter.excludeCommit(prefilterChangeSet);
+            }
         };
     }
 
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java	(revision 1763448)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java	(working copy)
@@ -28,8 +28,11 @@
 /**
  * Instance of this class provide a {@link EventFilter} for observation
  * events and a filter for commits.
+ * <p>
+ * In order to support OAK-4908 a FilterProvider
+ * extends Prefilter
  */
-public interface FilterProvider {
+public interface FilterProvider extends Prefilter {
 
     /**
      * Filter whole commits. Only commits for which this method returns
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/Prefilter.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/Prefilter.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/Prefilter.java	(working copy)
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
+
+/**
+ * A Prefilter is capable of inspecting a ChangeSet
+ * and deciding if the corresponding consumer
+ * (eg EventListener) is possibly interested in it
+ * or definitely not.
+ * <p>
+ * Falsely deciding to include is fine, falsely
+ * deciding to exclude is not.
+ */
+public interface Prefilter {
+
+    /**
+     * Decides if the commit belonging to the provided
+     * ChangeSet is potentially relevant to the listener
+     * or if it can definitely be excluded.
+     */
+	boolean excludeCommit(ChangeSet changeSet);
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/Prefilter.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterImpl.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterImpl.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterImpl.java	(working copy)
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
+
+public class PrefilterImpl implements Prefilter {
+
+    private final Set<String> rootIncludePaths;
+    private final Set<Pattern> includePathPatterns;
+    private final Set<Pattern> excludePathPatterns;
+    private final Set<String> parentNodeNames;
+    private final Set<String> parentNodeTypes;
+    private final Set<String> propertyNames;
+
+    public PrefilterImpl(@Nonnull Set<String> includedParentPaths, boolean isDeep, Set<String> excludedParentPaths,
+            Set<String> parentNodeNames, Set<String> parentNodeTypes, Set<String> propertyNames) {
+        this.rootIncludePaths = new HashSet<String>();
+        this.includePathPatterns = new HashSet<Pattern>();
+        for (String aRawIncludePath : includedParentPaths) {
+            final String aGlobbingIncludePath = !isDeep ? aRawIncludePath : concat(aRawIncludePath, "**");
+            this.rootIncludePaths.add(aRawIncludePath);
+            this.includePathPatterns.add(asPattern(aGlobbingIncludePath));
+        }
+        this.excludePathPatterns = new HashSet<Pattern>();
+        for (String aRawExcludePath : excludedParentPaths) {
+            this.excludePathPatterns.add(asPattern(concat(aRawExcludePath, "**")));
+        }
+        this.propertyNames = propertyNames == null ? null : new HashSet<String>(propertyNames);
+        this.parentNodeTypes = parentNodeTypes == null ? null : new HashSet<String>(parentNodeTypes);
+        this.parentNodeNames = parentNodeNames == null ? null : new HashSet<String>(parentNodeNames);
+    }
+
+    private Pattern asPattern(String patternWithGlobs) {
+        return Pattern.compile(globAsRegex(patternWithGlobs));
+    }
+
+    public static String globAsRegex(String patternWithGlobs) {
+        if (patternWithGlobs == null) {
+            throw new IllegalArgumentException("patternWithGlobs must not be null");
+        }
+        /*
+         * Pattern.matches("\\Q\\E(/[^/])*\\Q\\E", "/");
+         * Pattern.matches("(/[^/])*\\Q\\E", "/");
+         * Pattern.matches("\\Q\\E(/[^/])*", "/"); 
+         * Pattern.matches("(/[^/]*)*", "/");
+         */
+        return "\\Q" + patternWithGlobs.replace("/**", "\\E(/[^/]*)*\\Q").replace("/*", "/\\E[^/]*\\Q") + "\\E";
+    }
+
+    public static boolean match(String patternWithGlobs, String str) {
+        final String regex = globAsRegex(patternWithGlobs);
+        System.out.print("regexp='" + regex + "' ");
+        return Pattern.matches(regex, str);
+    }
+
+    @Override
+    public boolean excludeCommit(ChangeSet changeSet) {
+        final Set<String> parentPaths = new HashSet<String>(changeSet.getParentPaths());
+
+        // first go through excludes to remove those that are explicitly
+        // excluded
+        if (this.excludePathPatterns.size() != 0) {
+            final Iterator<String> it = parentPaths.iterator();
+            while (it.hasNext()) {
+                final String aParentPath = it.next();
+                if (patternsMatch(this.excludePathPatterns, aParentPath)) {
+                    // if an exclude pattern matches, remove the parentPath
+                    it.remove();
+                }
+            }
+        }
+        // note that cut-off paths are not applied with excludes,
+        // eg if excludePaths contains /var/foo/bar and path contains /var/foo
+        // with a maxPathLevel of 2, that might very well mean that
+        // the actual path would have been /var/foo/bar, but we don't know.
+        // so we cannot exclude it here and thus have a potential false negative
+        // (ie we didn't exclude it in the prefilter)
+
+        // now remainingPaths contains what is not excluded,
+        // then check if it is included
+        boolean included = false;
+        for (String aPath : parentPaths) {
+            // direct set contains is fastest, lets try that first
+            if (this.rootIncludePaths.contains(aPath)) {
+                included = true;
+                break;
+            }
+            if (patternsMatch(this.includePathPatterns, aPath)) {
+                included = true;
+                break;
+            }
+        }
+
+        if (!included) {
+            // well then we can definitely say that this commit is excluded
+            return true;
+        }
+
+        if (this.propertyNames != null && this.propertyNames.size() != 0) {
+            included = false;
+            for (String aProperty : changeSet.getPropertyNames()) {
+                if (this.propertyNames.contains(aProperty)) {
+                    included = true;
+                    break;
+                }
+            }
+            // if propertyNames are defined then if we can't find any
+            // at this stage (if !included) then this equals to filtering out
+            if (!included) {
+                return true;
+            }
+            // otherwise we have found a match, but one of the
+            // nodeType/nodeNames
+            // could still filter out, so we have to continue...
+        }
+
+        if (this.parentNodeTypes != null && this.parentNodeTypes.size() != 0) {
+            included = false;
+            for (String aNodeType : changeSet.getParentNodeTypes()) {
+                if (this.parentNodeTypes.contains(aNodeType)) {
+                    included = true;
+                    break;
+                }
+            }
+            // same story here: if nodeTypes is defined and we can't find any
+            // match
+            // then we're done now
+            if (!included) {
+                return true;
+            }
+            // otherwise, again, continue
+        }
+
+        if (this.parentNodeNames != null && this.parentNodeNames.size() != 0) {
+            included = false;
+            for (String aNodeName : changeSet.getParentNodeNames()) {
+                if (this.parentNodeNames.contains(aNodeName)) {
+                    included = true;
+                    break;
+                }
+            }
+            // and a 3rd time, if we can't find any nodeName match
+            // here, then we're filtering out
+            if (!included) {
+                return true;
+            }
+        }
+
+        // at this stage we haven't found any exclude, so we're likely including
+        return false;
+    }
+
+    static boolean prefixMatch(Set<String> pathsWithGlobs, String pathPrefix) {
+        for (String aPathWithGlobs : pathsWithGlobs) {
+            if (aPathWithGlobs.startsWith(pathPrefix)) {
+                return true;
+            }
+        }
+        for (String aPathWithGlobs : pathsWithGlobs) {
+            if (isPrefix(aPathWithGlobs, pathPrefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static boolean isPrefix(String aPathWithGlobs, String pathPrefix) {
+        final Iterator<String> it = elements(pathPrefix).iterator();
+        final Iterator<String> globIt = elements(aPathWithGlobs).iterator();
+        while (it.hasNext()) {
+            final String pathElem = it.next();
+            if (!globIt.hasNext()) {
+                return false;
+            }
+            String aGlobElem = globIt.next();
+            if (aGlobElem.equals(pathElem) || aGlobElem.equals("*")) {
+                // perfect
+                continue;
+            } else if (!aGlobElem.equals("**")) {
+                // mismatch
+                return false;
+            } else {
+                // ** case
+                // this means that the 'aPathWithGlobs' would now
+                // accept anything at this level and below - and since
+                // pathPrefix is only a prefix it is indeed possible
+                // that whatever comes after the ** in the glob
+                // actually shows up.
+                // which means: this is a match
+                return true;
+            }
+        }
+        // if we've gone through all elements in the path
+        // and haven't hit a conclusiive true/false case,
+        // then this is the prefix case, ie a match
+        return true;
+    }
+
+    private static boolean patternsMatch(Set<Pattern> pathPatterns, String path) {
+        if (path == null) {
+            return false;
+        }
+        for (Pattern pathPattern : pathPatterns) {
+            if (pathPattern.matcher(path).matches()) {
+                return true;
+            }
+        }
+        return false;
+    }
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterImpl.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterImplTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterImplTest.java	(revision 0)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterImplTest.java	(working copy)
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSetBuilder;
+import org.junit.Test;
+
+public class PrefilterImplTest {
+
+    private void doTestPrefixesAny(String prefixPath, boolean expectPrefixes, String... globPaths) {
+        Set<String> paths = new HashSet<String>(Arrays.asList(globPaths));
+        boolean result = PrefilterImpl.prefixMatch(paths, prefixPath);
+        assertEquals(expectPrefixes, result);
+    }
+
+    @Test
+    public void testPrefixMatch() throws Exception {
+        doTestPrefixesAny("/a/b/c", false, "/a/b");
+        doTestPrefixesAny("/a/b/c", true,  "/a/b/c");
+        doTestPrefixesAny("/a/b/c", true,  "/a/b/c/d");
+        doTestPrefixesAny("/a/b/c", true,  "/a/b/c/d", "/a/b");
+        doTestPrefixesAny("/a/b/c", true,  "/a/b/c/d", "/a/b/c");
+        doTestPrefixesAny("/a/b/c", false, "/a/*");
+        doTestPrefixesAny("/a/b/c", true,  "/a/**");
+        doTestPrefixesAny("/a/b/c", true, "/a/b/*");
+        doTestPrefixesAny("/a/b/c", true, "/**");
+        doTestPrefixesAny("/a/b", true, "/**");
+        doTestPrefixesAny("/a", true, "/**");
+        doTestPrefixesAny("/", true, "/**");
+    }
+    
+    /** shortcut for creating a set of strings */
+    private Set<String> s(String... entries) {
+        return new HashSet<String>(Arrays.asList(entries));
+    }
+    
+    private ChangeSet newChangeSet(int maxPathDepth, Set<String> parentPaths,
+            Set<String> parentNodeNames,
+            Set<String> parentNodeTypes,
+            Set<String> propertyNames) {
+        ChangeSetBuilder changeSetBuilder = new ChangeSetBuilder(Integer.MAX_VALUE, maxPathDepth);
+        changeSetBuilder.getParentPaths().addAll(parentPaths);
+        changeSetBuilder.getParentNodeNames().addAll(parentNodeNames);
+        changeSetBuilder.getParentNodeTypes().addAll(parentNodeTypes);
+        changeSetBuilder.getPropertyNames().addAll(propertyNames);
+        return changeSetBuilder.build();
+    }
+
+    @Test
+    public void testIsDeepFalse() throws Exception {
+        PrefilterImpl prefilter = new PrefilterImpl(s("/"), false, s("/excluded"), s(), s(), s());
+        
+        assertTrue(prefilter.excludeCommit(newChangeSet(5, s("/child1", "/child2"), s("child1", "child2"), s(), s())));
+        assertFalse(prefilter.excludeCommit(newChangeSet(5, s("/", "/child2"), s("child2"), s(), s())));
+    }
+
+    @Test
+    public void testParentPathsIncludeExclude() throws Exception {
+        PrefilterImpl prefilter = new PrefilterImpl(s("/"), true, s("/excluded"), s(), s(), s());
+        assertFalse(prefilter.excludeCommit(newChangeSet(5, s("/a", "/b"), s("a", "b"), s(), s())));
+        assertTrue(prefilter.excludeCommit(newChangeSet(5, s("/excluded/foo", "/excluded/bar"), s("foo", "bar"), s(), s())));
+        
+        prefilter = new PrefilterImpl(s("/included"), true, s("/excluded"), s(), s(), s());
+        assertTrue(prefilter.excludeCommit(newChangeSet(5, s("/a", "/b"), s(), s(), s())));
+        assertFalse(prefilter.excludeCommit(newChangeSet(5, s("/included/a", "/included/b"), s(), s(), s())));
+        assertTrue(prefilter.excludeCommit(newChangeSet(5, s("/excluded/foo", "/excluded/bar"), s(), s(), s())));
+
+        prefilter = new PrefilterImpl(s("/foo/**/included"), true, s("/excluded"), s(), s(), s());
+        assertTrue(prefilter.excludeCommit(newChangeSet(5, s("/a", "/b"), s(), s(), s())));
+        assertTrue(prefilter.excludeCommit(newChangeSet(5, s("/included/a", "/included/b"), s(), s(), s())));
+        assertFalse(prefilter.excludeCommit(newChangeSet(5, s("/foo/included/a", "/included/b"), s(), s(), s())));
+        assertFalse(prefilter.excludeCommit(newChangeSet(5, s("/foo/bar/included/a", "/included/b"), s(), s(), s())));
+        assertTrue(prefilter.excludeCommit(newChangeSet(5, s("/excluded/foo", "/excluded/bar"), s(), s(), s())));
+
+        prefilter = new PrefilterImpl(s("/main/**/included"), true, s("/main/excluded"), s(), s(), s());
+        assertTrue(prefilter.excludeCommit(newChangeSet(5, s("/main", "/main/foo"), s(), s(), s())));
+        assertFalse(prefilter.excludeCommit(newChangeSet(5, s("/main/included", "/main/excluded"), s(), s(), s())));
+        assertTrue(prefilter.excludeCommit(newChangeSet(5, s("/main/excluded/included", "/main/excluded"), s(), s(), s())));
+    }
+    
+    @Test
+    public void testParentNodeNames() throws Exception {
+        PrefilterImpl prefilter = new PrefilterImpl(s("/"), true, s(), s("foo", "bar"), s(), s());
+        assertFalse(prefilter.excludeCommit(newChangeSet(5, s("/a/foo", "/b"), s("foo", "b"), s(), s())));
+        assertTrue(prefilter.excludeCommit(newChangeSet(5, s("/a/zoo", "/b"), s("zoo", "b"), s(), s())));
+        assertFalse(prefilter.excludeCommit(newChangeSet(5, s("/a/zoo", "/bar"), s("zoo", "bar"), s(), s())));
+    }
+
+    @Test
+    public void testParentNodeTypes() throws Exception {
+        PrefilterImpl prefilter = new PrefilterImpl(s("/"), true, s(), s(), s("nt:folder"), s());
+        assertTrue(prefilter.excludeCommit(newChangeSet(5, s("/a"), s("a"), s("nt:unstructured"), s())));
+        assertFalse(prefilter.excludeCommit(newChangeSet(5, s("/a"), s("a"), s("nt:folder"), s())));
+    }
+
+    @Test
+    public void testPropertyNames() throws Exception {
+        PrefilterImpl prefilter = new PrefilterImpl(s("/"), true, s(), s(), s(), s("jcr:data"));
+        assertTrue(prefilter.excludeCommit(newChangeSet(5, s("/a"), s("a"), s(), s("myProperty"))));
+        assertFalse(prefilter.excludeCommit(newChangeSet(5, s("/a"), s("a"), s(), s("jcr:data"))));
+    }
+
+}

Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterImplTest.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java	(revision 1763448)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java	(working copy)
@@ -45,6 +45,7 @@
 import org.apache.jackrabbit.oak.plugins.name.NamespaceEditorProvider;
 import org.apache.jackrabbit.oak.plugins.nodetype.TypeEditorProvider;
 import org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
 import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.plugins.version.VersionHook;
 import org.apache.jackrabbit.oak.query.QueryEngineSettings;
@@ -120,6 +121,7 @@
             with(new NamespaceEditorProvider());
             with(new TypeEditorProvider());
             with(new ConflictValidatorProvider());
+            with(new ChangeCollectorProvider());
 
             with(new ReferenceEditorProvider());
             with(new ReferenceIndexProvider());
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java	(revision 1763448)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java	(working copy)
@@ -44,15 +44,20 @@
 import org.apache.jackrabbit.commons.observation.ListenerTracker;
 import org.apache.jackrabbit.oak.api.ContentSession;
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
 import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterConfigMBean;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
 import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
+import org.apache.jackrabbit.oak.plugins.observation.filter.Prefilter;
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
-import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.commit.FilteringAwareObserver;
+import org.apache.jackrabbit.oak.spi.commit.FilteringObserver;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
@@ -73,11 +78,27 @@
  * After instantiation a {@code ChangeProcessor} must be started in order to start
  * delivering observation events and stopped to stop doing so.
  */
-class ChangeProcessor implements Observer {
+class ChangeProcessor implements FilteringAwareObserver {
     private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
     private static final PerfLogger PERF_LOGGER = new PerfLogger(
             LoggerFactory.getLogger(ChangeProcessor.class.getName() + ".perf"));
 
+    private static enum FilterResult {
+        /** marks a commit as to be included, ie delivered.
+         * It's okay to falsely mark a commit as included,
+         * since filtering (as part of converting to events)
+         * will be applied at a later stage again. */
+        INCLUDE,
+        /** mark a commit as not of interest to this ChangeProcessor.
+         * Exclusion is definite, ie it's not okay to falsely
+         * mark a commit as excluded */
+        EXCLUDE, 
+        /** mark a commit as included but indicate that this
+         * is not a result of prefiltering but that prefiltering
+         * was skipped/not applicable for some reason */
+        PREFILTERING_SKIPPED
+    }
+    
     /**
      * Fill ratio of the revision queue at which commits should be delayed
      * (conditional of {@code commitRateLimiter} being non {@code null}).
@@ -89,7 +110,12 @@
      * kicks in.
      */
     public static final int MAX_DELAY;
-
+    
+    /** The test mode can be used to just verify if prefiltering would have
+     * correctly done its job and warn if that's not the case.
+     */
+    private static final boolean PREFILTERING_TESTMODE;
+    
     // OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using System.properties for now
     static {
         final String delayThresholdStr = System.getProperty("oak.commitRateLimiter.delayThreshold");
@@ -114,6 +140,18 @@
         }
         DELAY_THRESHOLD = delayThreshold;
         MAX_DELAY = maxDelay;
+
+        final String prefilteringTestModeStr = System.getProperty("oak.observation.prefilteringTestMode");
+        boolean prefilteringTestModeBool = true; // default is enabled==true
+        try {
+            if (prefilteringTestModeStr != null && prefilteringTestModeStr.length() != 0) {
+                prefilteringTestModeBool = Boolean.parseBoolean(prefilteringTestModeStr);
+                LOG.info("<clinit> using oak.observation.prefilteringTestMode = " + prefilteringTestModeBool);
+            }
+        } catch(RuntimeException e) {
+            LOG.warn("<clinit> could not parse oak.observation.prefilteringTestMode, using default (" + prefilteringTestModeBool + "): " + e, e);
+        }
+        PREFILTERING_TESTMODE = prefilteringTestModeBool;
     }
     
     private static final AtomicInteger COUNTER = new AtomicInteger();
@@ -147,6 +185,22 @@
 
     private volatile NodeState previousRoot;
 
+    /**
+     * for statistics: tracks how many times prefiltering excluded a commit
+     */
+    private int prefilterExcludeCount;
+    
+    /**
+     * for statistics: tracks how many times prefiltering included a commit
+     */
+    private int prefilterIncludeCount;
+    
+    /**
+     * for statistics: tracks how many times prefiltering was ignored (not evaluated at all),
+     * either because it was disabled, queue too small, CommitInfo null or CommitContext null
+     */
+    private int prefilterSkipCount;
+    
     public ChangeProcessor(
             ContentSession contentSession,
             NamePathMapper namePathMapper,
@@ -175,6 +229,29 @@
         filterProvider.set(filter);
     }
 
+    @Nonnull
+    public ChangeProcessorMBean getMBean() {
+        return new ChangeProcessorMBean() {
+
+            @Override
+            public int getPrefilterExcludeCount() {
+                return prefilterExcludeCount;
+            }
+
+            @Override
+            public int getPrefilterIncludeCount() {
+                return prefilterIncludeCount;
+            }
+
+            @Override
+            public int getPrefilterSkipCount() {
+                return prefilterSkipCount;
+            }
+
+        };
+    }
+
+    
     /**
      * Start this change processor
      * @param whiteboard  the whiteboard instance to used for scheduling individual
@@ -185,16 +262,19 @@
         checkState(registration == null, "Change processor started already");
         final WhiteboardExecutor executor = new WhiteboardExecutor();
         executor.start(whiteboard);
-        final BackgroundObserver observer = createObserver(executor);
+        final BackgroundObserver backgroundObserver = createObserver(executor);
+        final FilteringObserver filteringObserver = createFilteringObserver(backgroundObserver);
         listenerId = COUNTER.incrementAndGet() + "";
         Map<String, String> attrs = ImmutableMap.of(LISTENER_ID, listenerId);
         String name = tracker.toString();
         registration = new CompositeRegistration(
-            registerObserver(whiteboard, observer),
+            registerObserver(whiteboard, filteringObserver),
             registerMBean(whiteboard, EventListenerMBean.class,
                     tracker.getListenerMBean(), "EventListener", name, attrs),
             registerMBean(whiteboard, BackgroundObserverMBean.class,
-                    observer.getMBean(), BackgroundObserverMBean.TYPE, name, attrs),
+                    backgroundObserver.getMBean(), BackgroundObserverMBean.TYPE, name, attrs),
+            registerMBean(whiteboard, ChangeProcessorMBean.class,
+                    getMBean(), ChangeProcessorMBean.TYPE, name, attrs),
             //TODO If FilterProvider gets changed later then MBean would need to be
             // re-registered
             registerMBean(whiteboard, FilterConfigMBean.class,
@@ -202,7 +282,7 @@
             new Registration() {
                 @Override
                 public void unregister() {
-                    observer.close();
+                    backgroundObserver.close();
                 }
             },
             new Registration() {
@@ -279,9 +359,49 @@
                 maxQueueLength.recordValue(queueSize);
                 tracker.recordQueueLength(queueSize, created);
             }
+            
+            @Override
+            public String toString() {
+                return "Prefiltering BackgroundObserver for "+ChangeProcessor.this;
+            }
         };
     }
 
+    
+    private FilteringObserver createFilteringObserver(BackgroundObserver bo) {
+        return new FilteringObserver(bo) {
+    
+            @Override
+            protected boolean isExcluded(NodeState before, NodeState after, CommitInfo info) {
+                if (PREFILTERING_TESTMODE) {
+                    // then we don't prefilter but only test later
+                    prefilterSkipCount++;
+                    return false;
+                }
+                final FilterResult filterResult = evalPrefilter(before, after, info);
+                switch (filterResult) {
+                case PREFILTERING_SKIPPED: {
+                    prefilterSkipCount++;
+                    return false;
+                }
+                case EXCLUDE: {
+                    prefilterExcludeCount++;
+                    return false;
+                }
+                case INCLUDE: {
+                    prefilterIncludeCount++;
+                    return true;
+                }
+                default: {
+                    LOG.info("isExcluded: unknown/unsupported filter result: " + filterResult);
+                    prefilterSkipCount++;
+                    return false;
+                }
+                }
+            }
+        };
+    }
+
     private final Monitor runningMonitor = new Monitor();
     private final RunningGuard running = new RunningGuard(runningMonitor);
 
@@ -332,11 +452,32 @@
     }
 
     @Override
+    public void resetPreviousRoot(NodeState root) {
+        previousRoot = root;
+    }
+    
+    @Override
     public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
+        FilterResult prefilterTestResult = null;
+        if (PREFILTERING_TESTMODE) {
+            // OAK-4908 test mode: when the ChangeCollectorProvider is enabled
+            // there is the option to have the ChangeProcessors run in so-called
+            // 'test mode'. In this test mode the prefiltering is not applied,
+            // but instead verified if it *would have prefiltered correctly*.
+            // that test is therefore done at dequeue-time, hence in
+            // contentChanged
+            // TODO: remove this testing mechanism after a while
+            try {
+                prefilterTestResult = evalPrefilter(previousRoot, root, info);
+            } catch (Exception e) {
+                LOG.warn("contentChanged: exception in wouldBeExcludedCommit: " + e, e);
+            }
+        }
         if (previousRoot != null) {
             try {
                 long start = PERF_LOGGER.start();
                 FilterProvider provider = filterProvider.get();
+                boolean onEventInvoked = false;
                 // FIXME don't rely on toString for session id
                 if (provider.includeCommit(contentSession.toString(), info)) {
                     EventFilter filter = provider.getFilter(previousRoot, root);
@@ -349,6 +490,7 @@
                     if (hasEvents && runningMonitor.enterIf(running)) {
                         try {
                             CountingIterator countingEvents = new CountingIterator(events);
+                            onEventInvoked = true;
                             eventListener.onEvent(countingEvents);
                             countingEvents.updateCounters(eventCount, eventDuration);
                         } finally {
@@ -356,6 +498,26 @@
                         }
                     }
                 }
+                if (prefilterTestResult != null) {
+                    // OAK-4908 test mode
+                    if (prefilterTestResult == FilterResult.EXCLUDE && onEventInvoked) {
+                        // this is not ok, an event would have gotten
+                        // excluded-by-prefiltering even though
+                        // it actually got an event.
+                        LOG.warn("contentChanged: delivering event which would have been prefiltered, "
+                                + "info={}, this={}, listener={}", info, this, eventListener);
+                    } else if (prefilterTestResult == FilterResult.INCLUDE && !onEventInvoked && info != null
+                            && info != CommitInfo.EMPTY) {
+                        // this can occur arbitrarily frequent. as prefiltering
+                        // is not perfect, it can
+                        // have false negatives - ie it can include even though
+                        // no event is then created
+                        // hence we can only really log at debug here
+                        LOG.debug(
+                                "contentChanged: no event to deliver but not prefiltered, info={}, this={}, listener={}",
+                                info, this, eventListener);
+                    }
+                }
                 PERF_LOGGER.end(start, 100,
                         "Generated events (before: {}, after: {})",
                         previousRoot, root);
@@ -475,4 +637,57 @@
                 + ", commitRateLimiter=" + commitRateLimiter
                 + ", running=" + running.isSatisfied() + "]";
     }
+
+    /**
+     * Evaluate the prefilter for a given commit.
+     * 
+     * @return a FilterResult indicating either inclusion, exclusion or
+     *         inclusion-due-to-skipping. The latter is used to reflect
+     *         prefilter evaluation better in statistics (as it could also have
+     *         been reported just as include)
+     */
+    private FilterResult evalPrefilter(NodeState before, NodeState after, CommitInfo info) {
+        if (info == null) {
+            return FilterResult.PREFILTERING_SKIPPED;
+        }
+        final Map<String, Object> m = info.getInfo();
+        // info map is non-null
+        final CommitContext commitAttributes = (CommitContext) m.get(CommitContext.NAME);
+        if (commitAttributes == null) {
+            return FilterResult.PREFILTERING_SKIPPED;
+        }
+        if (before == null || after == null) {
+            // likely only occurs at startup
+            // we can't do any diffing etc, so just not exclude it
+            return FilterResult.PREFILTERING_SKIPPED;
+        }
+
+        final FilterProvider fp = filterProvider.get();
+        // FIXME don't rely on toString for session id
+        if (!fp.includeCommit(contentSession.toString(), info)) {
+            // 'classic' (and cheap pre-) filtering
+            return FilterResult.EXCLUDE;
+        }
+        // if the 'oak.isFiltered' flag is set in the
+        // 'oak.commitAttributes'
+        // we skip this entry in the ChangeProcessor.
+        // this results in a 'skip' of changes and ensures the
+        // 'previousRoot' is again set correctly (see below in else)
+        final ChangeSet prefilterChangeSet = (ChangeSet) commitAttributes
+                .get(ChangeCollectorProvider.COMMIT_CONTEXT_OBSERVATION_CHANGESET);
+        if (prefilterChangeSet == null) {
+            // then can't do any prefiltering since it was not
+            // able to complete the sets (within the given boundaries)
+            // (this corresponds to a large commit, which thus can't
+            // go through prefiltering)
+            return FilterResult.PREFILTERING_SKIPPED;
+        }
+
+        final Prefilter prefilter = fp;
+        if (prefilter.excludeCommit(prefilterChangeSet)) {
+            return FilterResult.EXCLUDE;
+        } else {
+            return FilterResult.INCLUDE;
+        }
+    }
 }
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java	(revision 0)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java	(working copy)
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.jcr.observation;
+
+public interface ChangeProcessorMBean {
+    String TYPE = "ChangeProcessorStats";
+
+    /** Returns the number of commits that were excluded by the prefiltering mechanism */
+    int getPrefilterExcludeCount();
+
+    /** Returns the number of commits that were included by the prefiltering mechanism */
+    int getPrefilterIncludeCount();
+    
+    /** Returns the number of commits that skipped prefiltering, thus got included */
+    int getPrefilterSkipCount();
+}

Property changes on: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java	(revision 1763448)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java	(working copy)
@@ -84,6 +84,12 @@
                 referenceInterface = BackgroundObserverMBean.class,
                 policy = ReferencePolicy.DYNAMIC,
                 cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
+        @Reference(name = "changeProcessorMBean",
+                bind = "bindChangeProcessorMBean",
+                unbind = "unbindChangeProcessorMBean",
+                referenceInterface = ChangeProcessorMBean.class,
+                policy = ReferencePolicy.DYNAMIC,
+                cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
         @Reference(name = "filterConfigMBean",
                 bind = "bindFilterConfigMBean",
                 unbind = "unbindFilterConfigMBean",
@@ -96,6 +102,7 @@
     private final AtomicInteger observerCount = new AtomicInteger();
     private final Map<ObjectName, EventListenerMBean> eventListeners = Maps.newConcurrentMap();
     private final Map<ObjectName, BackgroundObserverMBean> bgObservers = Maps.newConcurrentMap();
+    private final Map<ObjectName, ChangeProcessorMBean> changeProcessors = Maps.newConcurrentMap();
     private final Map<ObjectName, FilterConfigMBean> filterConfigs = Maps.newConcurrentMap();
 
     private Registration mbeanReg;
@@ -201,6 +208,11 @@
                     m.observerMBean = ef.getValue();
                 }
             }
+            for (Map.Entry<ObjectName, ChangeProcessorMBean> ef : changeProcessors.entrySet()){
+                if (Objects.equal(getListenerId(ef.getKey()), listenerId)){
+                    m.changeProcessorMBean = ef.getValue();
+                }
+            }
             mbeans.add(m);
         }
         return mbeans;
@@ -249,6 +261,16 @@
     }
 
     @SuppressWarnings("unused")
+    protected void bindChangeProcessorMBean(ChangeProcessorMBean mbean, Map<String, ?> config){
+    	changeProcessors.put(getObjectName(config), mbean);
+    }
+
+    @SuppressWarnings("unused")
+    protected void unbindChangeProcessorMBean(ChangeProcessorMBean mbean, Map<String, ?> config){
+    	changeProcessors.remove(getObjectName(config));
+    }
+
+    @SuppressWarnings("unused")
     protected void bindListenerMBean(EventListenerMBean mbean, Map<String, ?> config){
         eventListeners.put(getObjectName(config), mbean);
     }
@@ -280,6 +302,7 @@
     private static class ListenerMBeans {
         EventListenerMBean eventListenerMBean;
         BackgroundObserverMBean observerMBean;
+        ChangeProcessorMBean changeProcessorMBean;
         FilterConfigMBean filterConfigMBean;
     }
 
@@ -300,6 +323,9 @@
                 "ratioOfTimeSpentProcessingEvents",
                 "eventConsumerTimeRatio",
                 "queueBacklogMillis",
+                "prefilterSkips",
+                "prefilterExcludes",
+                "prefilterIncludes",
                 "queueSize",
                 "localEventCount",
                 "externalEventCount",
@@ -329,6 +355,9 @@
                 SimpleType.INTEGER,
                 SimpleType.INTEGER,
                 SimpleType.INTEGER,
+                SimpleType.INTEGER,
+                SimpleType.INTEGER,
+                SimpleType.INTEGER,
                 SimpleType.STRING,
                 SimpleType.BOOLEAN,
                 SimpleType.BOOLEAN,
@@ -373,6 +402,9 @@
                     mbeans.eventListenerMBean.getRatioOfTimeSpentProcessingEvents(),
                     mbeans.eventListenerMBean.getEventConsumerTimeRatio(),
                     mbeans.eventListenerMBean.getQueueBacklogMillis(),
+                    mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterSkipCount(),
+                    mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterExcludeCount(),
+                    mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterIncludeCount(),
                     mbeans.observerMBean.getQueueSize(),
                     mbeans.observerMBean.getLocalEventCount(),
                     mbeans.observerMBean.getExternalEventCount(),
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java	(revision 1763448)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java	(working copy)
@@ -58,6 +58,7 @@
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder.Condition;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
 import org.apache.jackrabbit.oak.plugins.observation.filter.PermissionProviderFactory;
+import org.apache.jackrabbit.oak.plugins.observation.filter.PrefilterImpl;
 import org.apache.jackrabbit.oak.plugins.observation.filter.Selectors;
 import org.apache.jackrabbit.oak.spi.commit.Observable;
 import org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration;
@@ -238,7 +239,8 @@
 
         List<Condition> excludeConditions = createExclusions(filterBuilder, excludedPaths);
 
-        filterBuilder
+        final String[] validatedNodeTypeNames = validateNodeTypeNames(nodeTypeName);
+		filterBuilder
             .includeSessionLocal(!noLocal)
             .includeClusterExternal(!noExternal)
             .includeClusterLocal(!noInternal)
@@ -249,13 +251,19 @@
                     filterBuilder.moveSubtree(),
                     filterBuilder.eventType(eventTypes),
                     filterBuilder.uuid(Selectors.PARENT, uuids),
-                    filterBuilder.nodeType(Selectors.PARENT, validateNodeTypeNames(nodeTypeName)),
+                    filterBuilder.nodeType(Selectors.PARENT, validatedNodeTypeNames),
                     filterBuilder.accessControl(permissionProviderFactory)));
 
         // FIXME support multiple path in ListenerTracker
         ListenerTracker tracker = new WarningListenerTracker(
                 !noExternal, listener, eventTypes, absPath, isDeep, uuids, nodeTypeName, noLocal);
 
+        // OAK-4908 : prefiltering support. here we have explicit yes/no/maybe filtering
+        // for things like propertyNames/nodeTypes/nodeNames/paths which cannot be 
+        // applied on the full-fledged filterBuilder above but requires an explicit 'prefilter' for that.
+        filterBuilder.setPrefilter(new PrefilterImpl(includePaths, isDeep, excludedPaths, null,
+                validatedNodeTypeNames == null ? null : newHashSet(validatedNodeTypeNames), null));
+        
         addEventListener(listener, tracker, filterBuilder.build());
     }
 
Index: oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
===================================================================
--- oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java	(revision 1763448)
+++ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java	(working copy)
@@ -95,7 +95,7 @@
     private static final String REFERENCEABLE_NODE = "\"referenceable\"";
     private static final String TEST_PATH = '/' + TEST_NODE;
     private static final String TEST_TYPE = "mix:test";
-    public static final int TIME_OUT = 60;
+    public static final int TIME_OUT = 6;
 
     private Session observingSession;
     private ObservationManager observationManager;
@@ -431,7 +431,25 @@
             observationManager.removeEventListener(listener);
         }
     }
+    
+    @Test
+    public void propertyFilter() throws Exception {
+        Node root = getNode("/");
+        ExpectationListener listener = new ExpectationListener();
+        observationManager.addEventListener(listener, PROPERTY_ADDED, "/a/b", false, null, null, false);
+        Node a = root.addNode("a");
+        Node b = a.addNode("b");
+        listener.expect("/a/b/jcr:primaryType", PROPERTY_ADDED);
 
+        listener.expectAdd(b.setProperty("propName", 1));
+    	root.getSession().save();
+
+    	List<Expectation> missing = listener.getMissing(TIME_OUT, TimeUnit.SECONDS);
+        assertTrue("Missing events: " + missing, missing.isEmpty());
+        List<Event> unexpected = listener.getUnexpected();
+        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+    }
+    
     @Test
     public void pathFilter() throws Exception {
         final String path = "/events/only/here";
