Index: oak-core/src/main/java/org/apache/jackrabbit/oak/core/SimpleCommitContext.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/core/SimpleCommitContext.java	(revision 1763448)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/core/SimpleCommitContext.java	(working copy)
@@ -30,6 +30,11 @@
     private final Map<String, Object> attrs = Maps.newHashMap();
 
     @Override
+    public String toString() {
+    	return "CommitContext[attrs="+attrs+"]";
+    }
+    
+    @Override
     public void set(String name, Object value) {
         attrs.put(checkNotNull(name), value);
     }
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java	(revision 1763448)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java	(working copy)
@@ -59,11 +59,18 @@
     private boolean includeClusterLocal = true;
     private final List<String> subTrees = newArrayList();
     private Condition condition = includeAll();
+    private Prefilter prefilter;
 
     public interface Condition {
         @Nonnull
         EventFilter createFilter(@Nonnull NodeState before, @Nonnull NodeState after);
     }
+    
+    @Nonnull
+    public FilterBuilder setPrefilter(Prefilter prefilter) {
+    	this.prefilter = prefilter;
+    	return this;
+    }
 
     /**
      * Adds a path to the set of paths whose subtrees include all events of
@@ -372,6 +379,7 @@
             final boolean includeClusterLocal = FilterBuilder.this.includeClusterLocal;
             final Iterable<String> subTrees = FilterBuilder.this.getSubTrees();
             final Condition condition = FilterBuilder.this.condition;
+            final Prefilter prefilter = FilterBuilder.this.prefilter;
 
             @Override
             public boolean includeCommit(@Nonnull String sessionId, @CheckForNull CommitInfo info) {
@@ -404,6 +412,15 @@
             private boolean isExternal(CommitInfo info) {
                 return info == null;
             }
+            
+            @Override
+            public boolean excludeCommit(PrefilterChangeSet prefilterChangeSet) {
+            	if (prefilter != null) {
+            		return prefilter.excludeCommit(prefilterChangeSet);
+            	} else {
+            		return false;
+            	}
+            }
         };
     }
 
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java	(revision 1763448)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java	(working copy)
@@ -29,7 +29,7 @@
  * Instance of this class provide a {@link EventFilter} for observation
  * events and a filter for commits.
  */
-public interface FilterProvider {
+public interface FilterProvider extends Prefilter {
 
     /**
      * Filter whole commits. Only commits for which this method returns
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/Prefilter.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/Prefilter.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/Prefilter.java	(working copy)
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+public interface Prefilter {
+
+	boolean excludeCommit(PrefilterChangeSet prefilterChangeSet);
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/Prefilter.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterChangeSet.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterChangeSet.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterChangeSet.java	(working copy)
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+public class PrefilterChangeSet {
+	
+	private final boolean immutable;
+	
+	private final int maxPrefilterPathDepth;
+	private final Set<String> paths;
+	private final Set<String> propertyNames;
+	private final Set<String> nodeTypes;
+	private final Set<String> nodeNames;
+	
+	private boolean overflow;
+	
+	public PrefilterChangeSet(int maxPrefilterPathDepth) {
+		this(false, maxPrefilterPathDepth, new HashSet<String>(), new HashSet<String>(),
+				new HashSet<String>(), new HashSet<String>());
+	}
+	
+	private PrefilterChangeSet(boolean immutable, int maxPrefilterPathDepth,
+			Set<String> paths,
+			Set<String> propertyNames,
+			Set<String> nodeTypes,
+			Set<String> nodeNames) {
+		this.immutable = immutable;
+		this.maxPrefilterPathDepth = maxPrefilterPathDepth;
+		this.paths = paths;
+		this.propertyNames = propertyNames;
+		this.nodeTypes = nodeTypes;
+		this.nodeNames = nodeNames;
+	}
+	
+	@Override
+	public String toString() {
+		return "PrefilterChangeSet[paths="+paths+", propertyNames="+propertyNames+", nodeNames="+nodeNames+", nodeTypes="+nodeTypes+"]";
+	}
+	
+	public Set<String> getPropertyNames() {
+		return propertyNames;
+	}
+	
+	public Set<String> getNodeTypes() {
+		return nodeTypes;
+	}
+	
+	public Set<String> getNodeNames() {
+		return nodeNames;
+	}
+	
+	public Set<String> getPaths() {
+		return paths;
+	}
+
+	public void onOverflow() {
+		if (immutable) {
+			throw new IllegalStateException("cannot call onOverflow when immutable");
+		}
+		overflow = true;
+		paths.clear();
+		propertyNames.clear();
+		nodeNames.clear();
+		nodeTypes.clear();
+	}
+
+	public boolean getOverflow() {
+		return overflow;
+	}
+	
+	public int getMaxPrefilterPathDepth() {
+		return maxPrefilterPathDepth;
+	}
+	
+	public PrefilterChangeSet asImmutable() {
+		return new PrefilterChangeSet(true, maxPrefilterPathDepth,
+				ImmutableSet.copyOf(propertyNames),
+				ImmutableSet.copyOf(propertyNames),
+				ImmutableSet.copyOf(nodeNames),
+				ImmutableSet.copyOf(nodeTypes));
+	}
+}
\ No newline at end of file

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterChangeSet.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterCollectorValidatorProvider.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterCollectorValidatorProvider.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterCollectorValidatorProvider.java	(working copy)
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+import static org.apache.jackrabbit.oak.commons.PathUtils.getName;
+
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Validator;
+import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+@Component(immediate = true)
+@Property(name = "type", value = "prefilterSupportValidator", propertyPrivate = true)
+@Service(ValidatorProvider.class)
+public class PrefilterCollectorValidatorProvider extends ValidatorProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrefilterCollectorValidatorProvider.class);
+
+    public static final String COMMIT_CONTEXT_OBSERVER_PREFILTERING_CHANGESET = "oak.observation.prefiltering.changeSet";
+
+    public static final boolean PREFILTER_SUPPORT_ENABLED;
+    public static final boolean PREFILTER_SUPPORT_TEST_ENABLED;
+    private static final int MAX_PREFILTERED_ITEMS;
+    private static final int MAX_PREFILTER_PATH_DEPTH;
+
+    static {
+        final String prefilteringEnabledStr = System.getProperty("oak.observation.prefilteringEnabled");
+        boolean prefilteringEnabledBool = true; // default is enabled==true
+        try {
+            if (prefilteringEnabledStr != null && prefilteringEnabledStr.length() != 0) {
+                prefilteringEnabledBool = Boolean.parseBoolean(prefilteringEnabledStr);
+                LOG.info("<clinit> using oak.observation.prefilteringEnabled = " + prefilteringEnabledBool);
+            }
+        } catch(RuntimeException e) {
+            LOG.warn("<clinit> could not parse oak.observation.prefilteringEnabled, using default (" + prefilteringEnabledBool + "): " + e, e);
+        }
+        PREFILTER_SUPPORT_ENABLED = prefilteringEnabledBool;
+
+        final String prefilteringTestEnabledStr = System.getProperty("oak.observation.prefilteringTestEnabled");
+        boolean prefilteringTestEnabledBool = false; // default is enabled==false
+        try {
+            if (prefilteringTestEnabledStr != null && prefilteringTestEnabledStr.length() != 0) {
+                prefilteringTestEnabledBool = Boolean.parseBoolean(prefilteringTestEnabledStr);
+                LOG.info("<clinit> using oak.observation.prefilteringTestEnabled = " + prefilteringTestEnabledBool);
+            }
+        } catch(RuntimeException e) {
+            LOG.warn("<clinit> could not parse oak.observation.prefilteringTestEnabled, using default (" + prefilteringTestEnabledBool + "): " + e, e);
+        }
+        PREFILTER_SUPPORT_TEST_ENABLED = prefilteringTestEnabledBool;
+
+        final String maxPrefilterItemsStr = System.getProperty("oak.observation.maxPrefilterItems");
+        int maxPrefilterItemsI =50; // default: 50
+        try {
+            if (maxPrefilterItemsStr != null && maxPrefilterItemsStr.length() != 0) {
+            	maxPrefilterItemsI = Integer.parseInt(maxPrefilterItemsStr);
+                LOG.info("<clinit> using oak.observation.maxPrefilterItems = " + maxPrefilterItemsI);
+            }
+        } catch(RuntimeException e) {
+            LOG.warn("<clinit> could not parse oak.observation.maxPrefilterItems, using default (" + maxPrefilterItemsI + "): " + e, e);
+        }
+        MAX_PREFILTERED_ITEMS = maxPrefilterItemsI;
+
+        final String maxPrefilterPathDepthStr = System.getProperty("oak.observation.maxPrefilterPathDepth");
+        int maxPrefilterPathDepthI = 9; // default: 9
+        try {
+            if (maxPrefilterPathDepthStr != null && maxPrefilterPathDepthStr.length() != 0) {
+            	maxPrefilterPathDepthI = Integer.parseInt(maxPrefilterPathDepthStr);
+                LOG.info("<clinit> using oak.observation.maxPrefilterPathDepth = " + maxPrefilterPathDepthI);
+            }
+        } catch(RuntimeException e) {
+            LOG.warn("<clinit> could not parse oak.observation.maxPrefilterPathDepth, using default (" + maxPrefilterPathDepthI + "): " + e, e);
+        }
+        MAX_PREFILTER_PATH_DEPTH = maxPrefilterPathDepthI;
+    }
+    
+    private static class PrefilterCollector implements Validator {
+
+		private final CommitInfo info;
+		private final PrefilterChangeSet changeSet;
+		private final boolean isRoot;
+		private final NodeState parentNodeOrNull;
+		private final String path;
+		private final int level;
+
+		private boolean addPathOnLeave;
+		private boolean addNodeNameOnLeave;
+		private boolean addParentNodeTypeOnLeave;
+
+		private static PrefilterCollector newRoot(@Nonnull CommitInfo info) {
+			return new PrefilterCollector(info, new PrefilterChangeSet(MAX_PREFILTER_PATH_DEPTH), true, null, "/", 0);
+		}
+		
+		private static PrefilterCollector newChild(@Nonnull PrefilterCollector parent, @Nonnull NodeState parentNode, @Nonnull String childName) {
+			return new PrefilterCollector(parent.info, parent.changeSet, false, parentNode, concat(parent.path, childName), parent.level + 1);
+		}
+		
+		private PrefilterCollector(@Nonnull CommitInfo info, @Nonnull PrefilterChangeSet changeSet, boolean isRoot, NodeState parentNodeOrNull, @Nonnull String path, int level) {
+			this.info = info;
+			this.changeSet = changeSet;
+			this.isRoot = isRoot;
+			this.parentNodeOrNull = parentNodeOrNull;
+			this.path = path;
+			this.level = level;
+		}
+		
+		@Override
+		public String toString() {
+			return "PrefilterCollectorValidatorProvider[path="+path+"]";
+		}
+
+		@Override
+		public void enter(NodeState before, NodeState after) throws CommitFailedException {
+			// nothing to be done here
+		}
+
+		@Override
+		public void leave(NodeState before, NodeState after) throws CommitFailedException {
+			// first check if we have to add anything to paths and/or nodeNames
+			if (addPathOnLeave) {
+				getPaths().add(path);
+			}
+			if (addNodeNameOnLeave) {
+				getNodeNames().add(getName(path));
+			}
+			if (addParentNodeTypeOnLeave && parentNodeOrNull != null) {
+				getNodeTypes().add(parentNodeOrNull.getName("jcr:primaryType"));
+				getNodeTypes().addAll(Lists.newArrayList(parentNodeOrNull.getNames("jcr:mixinTypes")));
+			}
+
+			// then if we're not at the root, we're done
+			if (!isRoot) {
+				return;
+			}
+
+			// but if we're at the root, then we add the ChangeSet to the CommitContext of the CommitInfo
+			if (!changeSet.getOverflow()) {
+				CommitContext commitContext = (CommitContext) info.getInfo().get(CommitContext.NAME);
+				commitContext.set(COMMIT_CONTEXT_OBSERVER_PREFILTERING_CHANGESET, changeSet);
+            } // otherwise we dont set anything - which equals to !applicable
+		}
+		
+		private void onOverflow() {
+			changeSet.onOverflow();
+		}
+		
+		private Set<String> getPropertyNames() {
+			if (changeSet.getOverflow() || changeSet.getPropertyNames().size() > MAX_PREFILTERED_ITEMS) {
+				onOverflow();
+			}
+			return changeSet.getPropertyNames();
+		}
+
+		private Set<String> getNodeTypes() {
+			if (changeSet.getOverflow() || changeSet.getNodeTypes().size() > MAX_PREFILTERED_ITEMS) {
+				onOverflow();
+			}
+			return changeSet.getNodeTypes();
+		}
+
+		private Set<String> getNodeNames() {
+			if (changeSet.getOverflow() || changeSet.getNodeNames().size() > MAX_PREFILTERED_ITEMS) {
+				onOverflow();
+			}
+			return changeSet.getNodeNames();
+		}
+
+		private Set<String> getPaths() {
+			if (changeSet.getOverflow() || changeSet.getPaths().size() > MAX_PREFILTERED_ITEMS) {
+				onOverflow();
+			}
+			return changeSet.getPaths();
+		}
+
+		@Override
+		public void propertyAdded(PropertyState after) throws CommitFailedException {
+			getPropertyNames().add(after.getName());
+			// record a change at the current path (which is the parent of this property)
+			addPathOnLeave = true;
+			// and with the current node name
+			addNodeNameOnLeave = true;
+			// additionally, add parent's nodeType on leave too
+			addParentNodeTypeOnLeave = true;
+		}
+
+		@Override
+		public void propertyChanged(PropertyState before, PropertyState after) throws CommitFailedException {
+			getPropertyNames().add(before.getName());
+			// record a change at the current path (which is the parent of this property)
+			addPathOnLeave = true;
+			// and with the current node name
+			addNodeNameOnLeave = true;
+			// additionally, add parent's nodeType on leave too
+			addParentNodeTypeOnLeave = true;
+		}
+
+		@Override
+		public void propertyDeleted(PropertyState before) throws CommitFailedException {
+			getPropertyNames().add(before.getName());
+			// record a change at the current path (which is the parent of this property)
+			addPathOnLeave = true;
+			// and with the current node name
+			addNodeNameOnLeave = true;
+			// additionally, add parent's nodeType on leave too
+			addParentNodeTypeOnLeave = true;
+		}
+
+		@Override
+		public Validator childNodeAdded(String childName, NodeState after) throws CommitFailedException {
+			getNodeNames().add(childName);
+			addPathOnLeave = true;
+			// additionally, add parent's nodeType on leave too
+			addParentNodeTypeOnLeave = true;
+			if (level < MAX_PREFILTER_PATH_DEPTH) {
+				return PrefilterCollector.newChild(this, after, childName);
+			} else {
+				return null;
+			}
+		}
+
+		@Override
+		public Validator childNodeChanged(String childName, NodeState before, NodeState after) throws CommitFailedException {
+			if (level < MAX_PREFILTER_PATH_DEPTH) {
+				// if we dive deeper then subsequent calls to property* or childNode* 
+				// will add those paths correspondingly - childNodeChanged doesn't add anything
+				// in this default case
+				return PrefilterCollector.newChild(this, after, childName);
+			} else {
+				// if we don't dive deeper we're cutting off child modifications,
+				// in which case we need to:
+				
+				// record a change at the current path
+				addPathOnLeave = true;
+				// and with the current node name
+				getNodeNames().add(childName);
+				// additionally, add parent's nodeType on leave too
+				addParentNodeTypeOnLeave = true;
+				return null;
+			}
+		}
+
+		@Override
+		public Validator childNodeDeleted(String childName, NodeState before) throws CommitFailedException {
+			getNodeNames().add(childName);
+			addPathOnLeave = true;
+			// additionally, add parent's nodeType on leave too
+			addParentNodeTypeOnLeave = true;
+			if (level < MAX_PREFILTER_PATH_DEPTH) {
+				return PrefilterCollector.newChild(this, before, childName);
+			} else {
+				return null;
+			}
+		}
+    	
+    }
+
+    @Override
+    protected Validator getRootValidator(NodeState before, NodeState after, CommitInfo info) {
+        if (!PREFILTER_SUPPORT_ENABLED && !PREFILTER_SUPPORT_TEST_ENABLED) {
+            // disabled
+            return null;
+        }
+        
+        if (info == null) {
+        	// then we cannot do prefilter-collecting, as we can't store
+        	// it in the info
+        	// and if info==null that indicates an external change anyway
+        	// for which prefiltering is done differently (depending on the Mk)
+        	return null;
+        }
+        
+        return PrefilterCollector.newRoot(info);
+    }
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterCollectorValidatorProvider.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterImpl.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterImpl.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterImpl.java	(working copy)
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
+import static org.apache.jackrabbit.oak.commons.PathUtils.getParentPath;
+import static org.apache.jackrabbit.oak.commons.PathUtils.getDepth;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nonnull;
+
+public class PrefilterImpl implements Prefilter {
+
+	private final Set<String> rootIncludePaths;
+	private final Set<Pattern> includePathPatterns;
+	private final Set<Pattern> excludePathPatterns;
+	private final Set<String> propertyNames;
+	private final Set<String> nodeTypes;
+	private final Set<String> nodeNames;
+
+	public PrefilterImpl(@Nonnull Set<String> includePaths, boolean isDeep,
+    		Set<String> excludePaths, Set<String> propertyNames, Set<String> nodeTypes,
+    		Set<String> nodeNames) {
+		this.rootIncludePaths = new HashSet<String>();
+		this.includePathPatterns = new HashSet<Pattern>();
+		for (String aRawIncludePath : includePaths) {
+			final String aGlobbingIncludePath = !isDeep ? aRawIncludePath : concat(aRawIncludePath, "**");
+			this.rootIncludePaths.add(aRawIncludePath);
+			this.includePathPatterns.add(asPattern(aGlobbingIncludePath));
+		}
+		this.excludePathPatterns = new HashSet<Pattern>();
+		for (String aRawExcludePath : excludePaths) {
+			this.excludePathPatterns.add(asPattern(concat(aRawExcludePath, "**")));
+		}
+    	this.propertyNames = propertyNames == null ? null : new HashSet<String>(propertyNames);
+    	this.nodeTypes = nodeTypes == null ? null : new HashSet<String>(nodeTypes);
+    	this.nodeNames = nodeNames == null ? null : new HashSet<String>(nodeNames);
+    }
+	
+    private Pattern asPattern(String patternWithGlobs) {
+    	return Pattern.compile(globAsRegex(patternWithGlobs));
+	}
+
+	public static String globAsRegex(String patternWithGlobs) {
+        if (patternWithGlobs==null) {
+            throw new IllegalArgumentException("patternWithGlobs must not be null");
+        }
+        return "\\Q"+patternWithGlobs.replace("/**", "\\E(/[^/]*)*\\Q").replace("/*", "/\\E[^/]*\\Q")+"\\E";
+    }
+    
+    public static boolean match(String patternWithGlobs, String str) {
+    	final String regex = globAsRegex(patternWithGlobs);
+    	System.out.print("regexp='"+regex+"' ");
+    	return Pattern.matches(regex, str);
+    }
+    
+    @Override
+    public boolean excludeCommit(PrefilterChangeSet prefilterChangeSet) {
+    	final Set<String> paths = prefilterChangeSet.getPaths();
+    	final int maxPathLevel = prefilterChangeSet.getMaxPrefilterPathDepth();
+		final Set<String> propertyNames = prefilterChangeSet.getPropertyNames();
+		final Set<String> nodeTypes = prefilterChangeSet.getNodeTypes();
+		final Set<String> nodeNames = prefilterChangeSet.getNodeNames();
+
+		final Set<String> remainingPaths = new HashSet<String>(paths);
+		
+		// first go through excludes to remove those that are explicitly excluded
+		if (this.excludePathPatterns.size() != 0) {
+			final Iterator<String> it = remainingPaths.iterator();
+			while(it.hasNext()) {
+				final String aPath = it.next();
+				if (patternsMatch(this.excludePathPatterns, aPath)) {
+					it.remove();
+				}
+			}
+		}
+		// note that cut-off paths are not applied with excludes,
+		// eg if excludePaths contains /var/foo/bar and path contains /var/foo
+		// with a maxPathLevel of 2, that might very well mean that 
+		// the actual path would have been /var/foo/bar, but we don't know.
+		// so we cannot exclude it here and thus have a potential false negative
+		// (ie we didn't exclude it in the prefilter)
+		
+		// now remainingPaths contains what is not excluded,
+		// then check if it is included
+		boolean included = false;
+		for (String aPath : remainingPaths) {
+			// direct set contains is fastest, lets try that first
+			if (this.rootIncludePaths.contains(aPath)) {
+				included = true;
+				break;
+			}
+			if (patternsMatch(this.includePathPatterns, aPath)) {
+				included = true;
+				break;
+			}
+//			String parent = getParentPath(aPath);
+//			if (this.includePaths.contains(parent)) {
+//				included = true;
+//				break;
+//			}
+//			if (patternsMatch(this.includePathPatterns, parent)) {
+//				included = true;
+//				break;
+//			}
+		}
+		if (!included) {
+			// if the plain comparison of includes/excludes doesn't resolve
+			// into an include yet, then we must take the cut-offs into account
+			// eg includePaths could contain /var/foo/bar and path /var/foo
+			// with a maxPathLevel of 2 - in that case the direct pattern
+			// matching fails. So now we have to go through all such
+			// cut-off paths and see if a filter 'might' match - and if that's
+			// the case we include it - in which case we risk a false negative
+			// again (ie we didn't exclude it in the prefilter even though it might 
+			// be meant to be)
+			// another advanced example: include filter is /var/*/bar
+			// and path is /var/foo (cut-off: 2)
+			for (String string : remainingPaths) {
+				if (getDepth(string) < maxPathLevel - 1) {
+					// then this cannot be a cut-off path
+					continue;
+				}
+				if (prefixMatch(this.rootIncludePaths, string)) {
+					included = true;
+					break;
+				}
+				String parent = getParentPath(string); 
+				if (prefixMatch(this.rootIncludePaths, parent)) {
+					included = true;
+					break;
+				}
+			}
+		}
+		
+		if (!included) {
+			// well then we can definitely say that this commit is excluded
+			return true;
+		}
+		
+		int maxPathDepth = 0;
+		for (String aPath : remainingPaths) {
+			maxPathDepth = Math.max(maxPathDepth, getDepth(aPath));
+		}
+		boolean cutOff = maxPathDepth >= maxPathLevel;
+		
+		if (!cutOff && this.propertyNames != null && this.propertyNames.size() != 0) {
+			included = false;
+			for (String aProperty : propertyNames) {
+				if (this.propertyNames.contains(aProperty)) {
+					included = true;
+					break;
+				}
+			}
+			// if propertyNames are defined then if we can't find any
+			// at this stage (if !included) then this equals to filtering out
+			if (!included) {
+				return true;
+			}
+			// otherwise we have found a match, but one of the nodeType/nodeNames
+			// could still filter out, so we have to continue...
+		}
+		
+		if (!cutOff && this.nodeTypes != null && this.nodeTypes.size() != 0) {
+			included = false;
+			for (String aNodeType : nodeTypes) {
+				if (this.nodeTypes.contains(aNodeType)) {
+					included = true;
+					break;
+				}
+			}
+			// same story here: if nodeTypes is defined and we can't find any match
+			// then we're done now
+			if (!included) {
+				return true;
+			}
+			// otherwise, again, continue
+		}
+
+		if (!cutOff && this.nodeNames != null && this.nodeNames.size() != 0) {
+			included = false;
+			for (String aNodeName : nodeNames) {
+				if (this.nodeNames.contains(aNodeName)) {
+					included = true;
+					break;
+				}
+			}
+			// and a 3rd time, if we can't find any nodeName match
+			// here, then we're filtering out
+			if (!included) {
+				return true;
+			} else {
+				// but unlike in the other 2 cases, if we're including
+				// then we're definitely not filtering out
+				return false;
+			}
+		}
+		
+		// at this stage we haven't found any exclude, so we're likely including
+		return false;
+	}
+
+	private static boolean prefixMatch(Set<String> pathsWithGlobs, String pathPrefix) {
+		for (String aPathWithGlobs : pathsWithGlobs) {
+			if (aPathWithGlobs.startsWith(pathPrefix)) {
+				return true;
+			}
+		}
+		for (String aPathWithGlobs : pathsWithGlobs) {
+			if (isPrefix(aPathWithGlobs, pathPrefix)) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	private static boolean isPrefix(String aPathWithGlobs, String pathPrefix) {
+		final Iterator<String> it = elements(pathPrefix).iterator();
+		final Iterator<String> globIt = elements(aPathWithGlobs).iterator();
+		while(it.hasNext()) {
+			final String pathElem = it.next();
+			if (!globIt.hasNext()) {
+				return false;
+			}
+			String aGlobElem = globIt.next();
+			if (aGlobElem.equals(pathElem) || aGlobElem.equals("*")) {
+				// perfect
+				continue;
+			} else if (!aGlobElem.equals("**")) {
+				// mismatch
+				return false;
+			} else {
+				// ** case
+				// this means that the 'aPathWithGlobs' would now
+				// accept anything at this level and below - and since
+				// pathPrefix is only a prefix it is indeed possible
+				// that whatever comes after the ** in the glob 
+				// actually shows up.
+				// which means: this is a match
+				return true;
+			}
+		}
+		// if we've gone through all elements in the path 
+		// and haven't hit a conclusiive true/false case, 
+		// then this is the prefix case, ie a match
+		return true;
+	}
+
+	private static boolean patternsMatch(Set<Pattern> pathPatterns, String path) {
+		if (path == null) {
+			return false;
+		}
+		for (Pattern pathPattern : pathPatterns) {
+			if (pathPattern.matcher(path).matches()) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	private static void testContainsPrefix(String prefixPath, String... globPaths) {
+		Set<String> paths = new HashSet<String>(Arrays.asList(globPaths));
+		System.out.println("Contains["+prefixPath+", "+paths+"]: "+prefixMatch(paths, prefixPath));
+	}
+	
+	public static void main(String[] args) throws Exception {
+		testContainsPrefix("/a/b/c", "/a/b");
+		testContainsPrefix("/a/b/c", "/a/b/c");
+		testContainsPrefix("/a/b/c", "/a/b/c/d");
+		testContainsPrefix("/a/b/c", "/a/b/c/d", "/a/b");
+		testContainsPrefix("/a/b/c", "/a/b/c/d", "/a/b/c");
+		testContainsPrefix("/a/b/c", "/a/*");
+		testContainsPrefix("/a/b/c", "/a/**");
+		testContainsPrefix("/a/b/c", "/a/b/*");
+		testContainsPrefix("/a/b/c", "/**");
+		testContainsPrefix("/a/b", "/**");
+		testContainsPrefix("/a", "/**");
+		testContainsPrefix("/", "/**");
+		Pattern.matches("\\Q\\E(/[^/])*\\Q\\E", "/");
+		Pattern.matches("(/[^/])*\\Q\\E", "/");
+		Pattern.matches("\\Q\\E(/[^/])*", "/");
+		Pattern.matches("(/[^/]*)*", "/");
+	}
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/PrefilterImpl.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java	(revision 1763448)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java	(working copy)
@@ -34,12 +34,13 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import com.google.common.base.Predicate;
 import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Predicate;
+
 /**
  * An observer that uses a change queue and a background thread to forward
  * content changes to another observer. The mechanism is designed so that
@@ -58,7 +59,7 @@
     /**
      * Signal for the background thread to stop processing changes.
      */
-    private static final ContentChange STOP = new ContentChange(null, null);
+    private static final ContentChange STOP = new ContentChange(null, null, null);
 
     /**
      * The receiving observer being notified off the background thread.
@@ -92,10 +93,12 @@
             Boolean.parseBoolean(System.getProperty("oak.observation.alwaysCollapseExternal", "false"));
 
     private static class ContentChange {
+        private final NodeState noopPreviousRoot;
         private final NodeState root;
         private final CommitInfo info;
         private final long created = System.currentTimeMillis();
-        ContentChange(NodeState root, CommitInfo info) {
+        ContentChange(NodeState noopPreviousRoot, NodeState root, CommitInfo info) {
+            this.noopPreviousRoot = noopPreviousRoot;
             this.root = root;
             this.info = info;
         }
@@ -107,6 +110,12 @@
      */
     private ContentChange last;
 
+    private NodeState noopPreviousRoot;
+    
+    private NodeState previousRoot;
+
+    private boolean previousWasExcluded;
+
     /**
      * Flag to indicate that some content changes were dropped because
      * the queue was full.
@@ -129,6 +138,11 @@
                 try {
                     ContentChange change = queue.poll();
                     if (change != null && change != STOP) {
+                        if (change.noopPreviousRoot != null) {
+                            // a ContentChange that carries a noopPreviousRoot
+                            // indicates a NOOP change
+                            observer.contentChanged(change.noopPreviousRoot, CommitInfo.NOOP_CHANGE);
+                        }
                         observer.contentChanged(change.root, change.info);
                         removed(queue.size(), change.created);
                         currentTask.onComplete(completionHandler);
@@ -182,6 +196,10 @@
         this(observer, executor, 1000);
     }
 
+    protected Observer getObserver() {
+        return observer;
+    }
+    
     /**
      * Called when ever an item has been added to the queue
      * @param queueSize  size of the queue
@@ -272,7 +290,7 @@
         checkState(!stopped);
         checkNotNull(root);
 
-        if (alwaysCollapseExternalEvents && info == null && last != null && last.info == null) {
+        if (!previousWasExcluded && alwaysCollapseExternalEvents && info == null && last != null && last.info == null) {
             // This is an external change. If the previous change was
             // also external, we can drop it from the queue (since external
             // changes in any case can cover multiple commits) to help
@@ -281,35 +299,66 @@
             full = false;
         }
 
-        ContentChange change;
+        final boolean excluded = isExcluded(previousRoot, root, info);
+        final ContentChange change;
+        final NodeState noopPreviousRoot;
+        if (previousWasExcluded) {
+            // skippedPreviousRoot != null indicates a 'NOOP_CHANGE'
+            noopPreviousRoot = this.noopPreviousRoot;
+        } else {
+            // skippedPreviousRoot == null indicates a normal ContentChange
+            noopPreviousRoot = null;
+        }
         if (full) {
             // If the queue is full, some commits have already been skipped
             // so we need to drop the possible local commit information as
             // only external changes can be merged together to larger chunks.
-            change = new ContentChange(root, null);
+            change = new ContentChange(noopPreviousRoot, root, null);
         } else {
-            change = new ContentChange(root, info);
+            change = new ContentChange(noopPreviousRoot, root, info);
         }
 
-        // Try to add this change to the queue without blocking, and
-        // mark the queue as full if there wasn't enough space
-        full = !queue.offer(change);
+        if (!excluded) {
+            // Try to add this change to the queue without blocking, and
+            // mark the queue as full if there wasn't enough space
+            full = !queue.offer(change);
+    
+            if (!full) {
+                // Keep track of the last change added, so we can do the
+                // compacting of external changes shown above.
+                last = change;
 
-        if (!full) {
-            // Keep track of the last change added, so we can do the
-            // compacting of external changes shown above.
-            last = change;
+                // we passed the previousRoot flag via noopPreviousRoot
+                // in ContentChange to the queue, so we can now reset
+                // the flag here
+                this.previousWasExcluded = false;
+                this.noopPreviousRoot = null;
+            } else {
+                // if the queue is full now, then we must not update
+                // the previousRoot and previousWasExcluded, as they
+                // must be handled by being added as noopPreviousRoot
+                // in ContentChange to the queue - which they currently
+                // can't - so leave them as is
+            }
+
+            // Set the completion handler on the currently running task. Multiple calls
+            // to onComplete are not a problem here since we always pass the same value.
+            // Thus there is no question as to which of the handlers will effectively run.
+            currentTask.onComplete(completionHandler);
+            added(queue.size());
+        } else {
+            this.previousWasExcluded = true;
+            this.noopPreviousRoot = root;
         }
-
-        // Set the completion handler on the currently running task. Multiple calls
-        // to onComplete are not a problem here since we always pass the same value.
-        // Thus there is no question as to which of the handlers will effectively run.
-        currentTask.onComplete(completionHandler);
-        added(queue.size());
+        this.previousRoot = root;
     }
 
     //------------------------------------------------------------< internal >---
 
+    protected boolean isExcluded(NodeState before, NodeState after, CommitInfo info) {
+    	return false;
+    }
+    
     private static Logger getLogger(@Nonnull Observer observer) {
         return LoggerFactory.getLogger(checkNotNull(observer).getClass());
     }
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java	(revision 1763448)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java	(working copy)
@@ -45,6 +45,15 @@
     public static final CommitInfo EMPTY =
             new CommitInfo(OAK_UNKNOWN, OAK_UNKNOWN);
 
+    /**
+     * OAK-4796: commitInfo object representing a 'no op ie filtered change'. 
+     * Used to indicate Observers that a particular contentChanged call 
+     * should be ignored (but nevertheless the call is made, in particular
+     * to allow Observers to take note of the new root)
+     */
+    public static final CommitInfo NOOP_CHANGE =
+            new CommitInfo(OAK_UNKNOWN, OAK_UNKNOWN);
+    
     private final String sessionId;
 
     private final String userId;
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java	(revision 1763448)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java	(working copy)
@@ -46,6 +46,7 @@
 import org.apache.jackrabbit.oak.plugins.nodetype.TypeEditorProvider;
 import org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent;
 import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
+import org.apache.jackrabbit.oak.plugins.observation.filter.PrefilterCollectorValidatorProvider;
 import org.apache.jackrabbit.oak.plugins.version.VersionHook;
 import org.apache.jackrabbit.oak.query.QueryEngineSettings;
 import org.apache.jackrabbit.oak.security.SecurityProviderImpl;
@@ -120,6 +121,7 @@
             with(new NamespaceEditorProvider());
             with(new TypeEditorProvider());
             with(new ConflictValidatorProvider());
+            with(new PrefilterCollectorValidatorProvider());
 
             with(new ReferenceEditorProvider());
             with(new ReferenceIndexProvider());
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java	(revision 1763448)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java	(working copy)
@@ -49,8 +49,12 @@
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterConfigMBean;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
 import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
+import org.apache.jackrabbit.oak.plugins.observation.filter.Prefilter;
+import org.apache.jackrabbit.oak.plugins.observation.filter.PrefilterChangeSet;
+import org.apache.jackrabbit.oak.plugins.observation.filter.PrefilterCollectorValidatorProvider;
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -89,6 +93,13 @@
      * kicks in.
      */
     public static final int MAX_DELAY;
+    
+    /**
+     * number of events in the observation queue after which prefiltering will
+     * become active - if fewer than this many events are in the queue, prefiltering
+     * is not applied.
+     */
+    private static final int PREFILTERING_LIMIT;
 
     // OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using System.properties for now
     static {
@@ -114,6 +125,18 @@
         }
         DELAY_THRESHOLD = delayThreshold;
         MAX_DELAY = maxDelay;
+
+        final String prefilteringLimitStr = System.getProperty("oak.observation.prefilteringLimit");
+        int prefilteringLimit = 20; /* default is 20 */
+        try{
+            if (prefilteringLimitStr != null && prefilteringLimitStr.length() != 0) {
+                prefilteringLimit = Integer.parseInt(prefilteringLimitStr);
+                LOG.info("<clinit> using oak.observation.prefilteringLimit of " + prefilteringLimit);
+            }
+        } catch(RuntimeException e) {
+            LOG.warn("<clinit> could not parse oak.observation.prefilteringLimitd, using default(" + prefilteringLimit + "): " + e, e);
+        }
+        PREFILTERING_LIMIT = prefilteringLimit;
     }
     
     private static final AtomicInteger COUNTER = new AtomicInteger();
@@ -147,6 +170,22 @@
 
     private volatile NodeState previousRoot;
 
+    /**
+     * for statistics: tracks how many times prefiltering excluded a commit
+     */
+    private int prefilterExcludeCount;
+    
+    /**
+     * for statistics: tracks how many times prefiltering included a commit
+     */
+    private int prefilterIncludeCount;
+    
+    /**
+     * for statistics: tracks how many times prefiltering was ignored (not evaluated at all),
+     * either because it was disabled, queue too small, CommitInfo null or CommitContext null
+     */
+    private int prefilterSkipCount;
+    
     public ChangeProcessor(
             ContentSession contentSession,
             NamePathMapper namePathMapper,
@@ -175,6 +214,29 @@
         filterProvider.set(filter);
     }
 
+    @Nonnull
+    public ChangeProcessorMBean getMBean(){
+        return new ChangeProcessorMBean() {
+
+			@Override
+			public int getPrefilterExcludeCount() {
+				return prefilterExcludeCount;
+			}
+
+			@Override
+			public int getPrefilterIncludeCount() {
+				return prefilterIncludeCount;
+			}
+
+			@Override
+			public int getPrefilterSkipCount() {
+				return prefilterSkipCount;
+			}
+        	
+        };
+    }
+
+    
     /**
      * Start this change processor
      * @param whiteboard  the whiteboard instance to used for scheduling individual
@@ -185,16 +247,18 @@
         checkState(registration == null, "Change processor started already");
         final WhiteboardExecutor executor = new WhiteboardExecutor();
         executor.start(whiteboard);
-        final BackgroundObserver observer = createObserver(executor);
+        backgroundObserver = createObserver(executor);
         listenerId = COUNTER.incrementAndGet() + "";
         Map<String, String> attrs = ImmutableMap.of(LISTENER_ID, listenerId);
         String name = tracker.toString();
         registration = new CompositeRegistration(
-            registerObserver(whiteboard, observer),
+            registerObserver(whiteboard, backgroundObserver),
             registerMBean(whiteboard, EventListenerMBean.class,
                     tracker.getListenerMBean(), "EventListener", name, attrs),
             registerMBean(whiteboard, BackgroundObserverMBean.class,
-                    observer.getMBean(), BackgroundObserverMBean.TYPE, name, attrs),
+                    backgroundObserver.getMBean(), BackgroundObserverMBean.TYPE, name, attrs),
+            registerMBean(whiteboard, ChangeProcessorMBean.class,
+                    getMBean(), ChangeProcessorMBean.TYPE, name, attrs),
             //TODO If FilterProvider gets changed later then MBean would need to be
             // re-registered
             registerMBean(whiteboard, FilterConfigMBean.class,
@@ -202,7 +266,7 @@
             new Registration() {
                 @Override
                 public void unregister() {
-                    observer.close();
+                    backgroundObserver.close();
                 }
             },
             new Registration() {
@@ -224,11 +288,13 @@
         return new BackgroundObserver(this, executor, queueLength) {
             private volatile long delay;
             private volatile boolean blocking;
+            private volatile int lastQueueSize;
 
             @Override
             protected void added(int queueSize) {
                 maxQueueLength.recordValue(queueSize);
                 tracker.recordQueueLength(queueSize);
+                lastQueueSize = queueSize;
 
                 if (queueSize == queueLength) {
                     if (commitRateLimiter != null) {
@@ -278,12 +344,46 @@
             protected void removed(int queueSize, long created) {
                 maxQueueLength.recordValue(queueSize);
                 tracker.recordQueueLength(queueSize, created);
+                lastQueueSize = queueSize;
             }
+            
+            @Override
+            public String toString() {
+                return "PrefilteringBackgroundObserver for "+ChangeProcessor.this;
+            }
+            
+            @Override
+            protected boolean isExcluded(NodeState before, NodeState after, CommitInfo info) {
+            	if (!PrefilterCollectorValidatorProvider.PREFILTER_SUPPORT_ENABLED) {
+            		prefilterSkipCount++;
+            		return false;
+            	}
+            	if (lastQueueSize <= PREFILTERING_LIMIT) {
+                    // only do prefiltering if the queue is larger than a configurable limit
+                    // to keep commit cost low for the majority of commits, only add cost
+                    // when system is under pressure.
+            		prefilterSkipCount++;
+                    return false;
+                }
+				Boolean isExcludedResult = isExcludedByPrefiltering(before, after, info);
+				if (isExcludedResult == null) {
+					// null==skip => false
+					prefilterSkipCount++;
+					return false;
+				} else if (isExcludedResult) {
+					prefilterExcludeCount++;
+					return false;
+				} else {
+					prefilterIncludeCount++;
+					return true;
+				}
+			}
         };
     }
 
     private final Monitor runningMonitor = new Monitor();
     private final RunningGuard running = new RunningGuard(runningMonitor);
+    private BackgroundObserver backgroundObserver;
 
     /**
      * Try to stop this change processor if running. This method will wait
@@ -333,10 +433,38 @@
 
     @Override
     public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
-        if (previousRoot != null) {
+        // OAK-4796 : if the CommitInfo is a NOOP, then we should skip
+        // this contentChanged and instead just remember the root as the new previousRoot
+        // The NOOP is set by the BackgroundObserver when it finds out
+        // that the ObserationFilterValidatorProvider (and its Validators)
+        // have figured out that this Observer's filter would not result in any event
+        final boolean noopChange = info == CommitInfo.NOOP_CHANGE;
+
+        Boolean wouldBeExcluded = null;
+        if (PrefilterCollectorValidatorProvider.PREFILTER_SUPPORT_TEST_ENABLED) {
+        	// OAK-4796 testing mechanism: the pre-filtering is either activated
+        	// or in test-mode - in which case instead of a NOOP, the normal 
+        	// CommitInfo is passed, but it contains a CommitAttribute containing
+        	// the interested providers - against which the real filter is then evaluated.
+        	// So if the real filter finds an event, it checks if the pre-filtering 
+        	// would have correctly detected this - and vice-verca - and warns if that's
+        	// not the case.
+        	//TODO: remove this testing mechanism after a while
+	        try{
+	        	wouldBeExcluded = isExcludedByPrefiltering(previousRoot, root, info);
+	        	if (wouldBeExcluded == null) {
+	        		// skip => false
+	        		wouldBeExcluded = false;
+	        	}
+	        } catch(Exception e) {
+	        	LOG.warn("contentChanged: exception in wouldBeExcludedCommit: "+e, e);
+	        }
+        }
+        if (previousRoot != null && !noopChange) {
             try {
                 long start = PERF_LOGGER.start();
                 FilterProvider provider = filterProvider.get();
+                boolean onEventInvoked = false;
                 // FIXME don't rely on toString for session id
                 if (provider.includeCommit(contentSession.toString(), info)) {
                     EventFilter filter = provider.getFilter(previousRoot, root);
@@ -349,6 +477,7 @@
                     if (hasEvents && runningMonitor.enterIf(running)) {
                         try {
                             CountingIterator countingEvents = new CountingIterator(events);
+                            onEventInvoked = true;
                             eventListener.onEvent(countingEvents);
                             countingEvents.updateCounters(eventCount, eventDuration);
                         } finally {
@@ -356,6 +485,22 @@
                         }
                     }
                 }
+                if (wouldBeExcluded != null) {
+                	// OAK-4796 testing mechanism
+                    if (wouldBeExcluded && onEventInvoked) {
+                    	// this is not ok, an event would have gotten excluded-by-prefiltering even though
+                    	// it actually got an event.
+                        LOG.warn("contentChanged: delivering event which would have been prefiltered, "
+                        		+ "info={}, this={}, listener={}", info, this, eventListener);
+                    } else if (!wouldBeExcluded && !onEventInvoked && info != null 
+                    		&& info != CommitInfo.EMPTY && info != CommitInfo.NOOP_CHANGE) {
+                    	// this can occur arbitrarily frequent. as prefiltering is not perfect, it can
+                    	// have false negatives - ie it can include even though no event is then created
+                    	// hence we can only really log at debug here
+                        LOG.debug("contentChanged: no event to deliver but not prefiltered, info={}, this={}, listener={}", 
+                        		info, this, eventListener);
+                    }
+                }
                 PERF_LOGGER.end(start, 100,
                         "Generated events (before: {}, after: {})",
                         previousRoot, root);
@@ -475,4 +620,62 @@
                 + ", commitRateLimiter=" + commitRateLimiter
                 + ", running=" + running.isSatisfied() + "]";
     }
+
+    /**
+     * Decide if for the given before/after/info triple we're including (false), excluding (true)
+     * or skipping (null=>false).
+     * <p>
+     * Reason for choosing Boolean over boolean as return type is to be able to do statistics
+     * on skip (==null).
+     * @return null indicates include but not because the prefilter includes, but because 
+     * we have to skip this element due to CommitInfo or CommitContext being null.
+     */
+	private Boolean isExcludedByPrefiltering(NodeState before, NodeState after, CommitInfo info) {
+		if (info == null) {
+			return null;
+		}
+		Map<String, Object> m = info.getInfo();
+		if (m == null) {
+			return null;
+		}
+		CommitContext commitAttributes = (CommitContext) m.get(CommitContext.NAME);
+		if (commitAttributes == null) {
+			return null;
+		}
+		if (before == null || after == null) {
+			// likely only occurs at startup
+			// we can't do any diffing etc, so just not exclude it
+			return null;
+		}
+
+		final FilterProvider fp = filterProvider.get();
+		// FIXME don't rely on toString for session id
+		if (!fp.includeCommit(contentSession.toString(), info)) {
+			// 'classic' (and cheap pre-) filtering
+			return true;
+		}
+		EventFilter filter = fp.getFilter(before, after);
+		if (filter == null) {
+			// if the filter provider doesn't return a filter this corresponds
+			// to filtering-out
+			return true;
+		}
+		// if the 'oak.isFiltered' flag is set in the
+		// 'oak.commitAttributes'
+		// we skip this entry in the ChangeProcessor.
+		// this results in a 'skip' of changes and ensures the
+		// 'previousRoot' is again set correctly (see below in else)
+		PrefilterChangeSet prefilterChangeSet = (PrefilterChangeSet) commitAttributes
+				.get(PrefilterCollectorValidatorProvider.COMMIT_CONTEXT_OBSERVER_PREFILTERING_CHANGESET);
+		if (prefilterChangeSet == null) {
+			// then can't do any prefiltering since it was not
+			// able to complete the sets (within the given boundaries)
+			// (this corresponds to a large commit, which thus can't
+			// go through prefiltering)
+			return null;
+		}
+
+		final Prefilter prefilter = fp;
+		return prefilter.excludeCommit(prefilterChangeSet);
+	}
 }
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java	(revision 0)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java	(working copy)
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.jcr.observation;
+
+public interface ChangeProcessorMBean {
+    String TYPE = "ChangeProcessorStats";
+
+    int getPrefilterExcludeCount();
+
+    int getPrefilterIncludeCount();
+    
+    int getPrefilterSkipCount();
+}

Property changes on: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java	(revision 1763448)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java	(working copy)
@@ -84,6 +84,12 @@
                 referenceInterface = BackgroundObserverMBean.class,
                 policy = ReferencePolicy.DYNAMIC,
                 cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
+        @Reference(name = "changeProcessorMBean",
+        		bind = "bindChangeProcessorMBean",
+        		unbind = "unbindChangeProcessorMBean",
+        		referenceInterface = ChangeProcessorMBean.class,
+        		policy = ReferencePolicy.DYNAMIC,
+        		cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
         @Reference(name = "filterConfigMBean",
                 bind = "bindFilterConfigMBean",
                 unbind = "unbindFilterConfigMBean",
@@ -96,6 +102,7 @@
     private final AtomicInteger observerCount = new AtomicInteger();
     private final Map<ObjectName, EventListenerMBean> eventListeners = Maps.newConcurrentMap();
     private final Map<ObjectName, BackgroundObserverMBean> bgObservers = Maps.newConcurrentMap();
+    private final Map<ObjectName, ChangeProcessorMBean> changeProcessors = Maps.newConcurrentMap();
     private final Map<ObjectName, FilterConfigMBean> filterConfigs = Maps.newConcurrentMap();
 
     private Registration mbeanReg;
@@ -201,6 +208,11 @@
                     m.observerMBean = ef.getValue();
                 }
             }
+            for (Map.Entry<ObjectName, ChangeProcessorMBean> ef : changeProcessors.entrySet()){
+                if (Objects.equal(getListenerId(ef.getKey()), listenerId)){
+                    m.changeProcessorMBean = ef.getValue();
+                }
+            }
             mbeans.add(m);
         }
         return mbeans;
@@ -249,6 +261,16 @@
     }
 
     @SuppressWarnings("unused")
+    protected void bindChangeProcessorMBean(ChangeProcessorMBean mbean, Map<String, ?> config){
+    	changeProcessors.put(getObjectName(config), mbean);
+    }
+
+    @SuppressWarnings("unused")
+    protected void unbindChangeProcessorMBean(ChangeProcessorMBean mbean, Map<String, ?> config){
+    	changeProcessors.remove(getObjectName(config));
+    }
+
+    @SuppressWarnings("unused")
     protected void bindListenerMBean(EventListenerMBean mbean, Map<String, ?> config){
         eventListeners.put(getObjectName(config), mbean);
     }
@@ -280,6 +302,7 @@
     private static class ListenerMBeans {
         EventListenerMBean eventListenerMBean;
         BackgroundObserverMBean observerMBean;
+        ChangeProcessorMBean changeProcessorMBean;
         FilterConfigMBean filterConfigMBean;
     }
 
@@ -300,6 +323,9 @@
                 "ratioOfTimeSpentProcessingEvents",
                 "eventConsumerTimeRatio",
                 "queueBacklogMillis",
+                "prefilterSkips",
+                "prefilterExcludes",
+                "prefilterIncludes",
                 "queueSize",
                 "localEventCount",
                 "externalEventCount",
@@ -329,6 +355,9 @@
                 SimpleType.INTEGER,
                 SimpleType.INTEGER,
                 SimpleType.INTEGER,
+                SimpleType.INTEGER,
+                SimpleType.INTEGER,
+                SimpleType.INTEGER,
                 SimpleType.STRING,
                 SimpleType.BOOLEAN,
                 SimpleType.BOOLEAN,
@@ -373,6 +402,9 @@
                     mbeans.eventListenerMBean.getRatioOfTimeSpentProcessingEvents(),
                     mbeans.eventListenerMBean.getEventConsumerTimeRatio(),
                     mbeans.eventListenerMBean.getQueueBacklogMillis(),
+                    mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterSkipCount(),
+                    mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterExcludeCount(),
+                    mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterIncludeCount(),
                     mbeans.observerMBean.getQueueSize(),
                     mbeans.observerMBean.getLocalEventCount(),
                     mbeans.observerMBean.getExternalEventCount(),
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java	(revision 1763448)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java	(working copy)
@@ -58,6 +58,7 @@
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder.Condition;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
 import org.apache.jackrabbit.oak.plugins.observation.filter.PermissionProviderFactory;
+import org.apache.jackrabbit.oak.plugins.observation.filter.PrefilterImpl;
 import org.apache.jackrabbit.oak.plugins.observation.filter.Selectors;
 import org.apache.jackrabbit.oak.spi.commit.Observable;
 import org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration;
@@ -238,7 +239,8 @@
 
         List<Condition> excludeConditions = createExclusions(filterBuilder, excludedPaths);
 
-        filterBuilder
+        final String[] validatedNodeTypeNames = validateNodeTypeNames(nodeTypeName);
+		filterBuilder
             .includeSessionLocal(!noLocal)
             .includeClusterExternal(!noExternal)
             .includeClusterLocal(!noInternal)
@@ -249,13 +251,22 @@
                     filterBuilder.moveSubtree(),
                     filterBuilder.eventType(eventTypes),
                     filterBuilder.uuid(Selectors.PARENT, uuids),
-                    filterBuilder.nodeType(Selectors.PARENT, validateNodeTypeNames(nodeTypeName)),
+                    filterBuilder.nodeType(Selectors.PARENT, validatedNodeTypeNames),
                     filterBuilder.accessControl(permissionProviderFactory)));
 
         // FIXME support multiple path in ListenerTracker
         ListenerTracker tracker = new WarningListenerTracker(
                 !noExternal, listener, eventTypes, absPath, isDeep, uuids, nodeTypeName, noLocal);
 
+        // OAK-4796 : prefiltering support. here we have explicit yes/no/maybe filtering
+        // for things like propertyNames/nodeTypes/nodeNames/paths which cannot be 
+        // applied on the full-fledged filterBuilder above but requires an explicit 'prefilter' for that.
+        filterBuilder.setPrefilter(
+        		new PrefilterImpl(includePaths, isDeep, excludedPaths, 
+        				null, 
+        				validatedNodeTypeNames==null ? null : newHashSet(validatedNodeTypeNames), 
+        				null));
+        
         addEventListener(listener, tracker, filterBuilder.build());
     }
 
Index: oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
===================================================================
--- oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java	(revision 1763448)
+++ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java	(working copy)
@@ -95,7 +95,7 @@
     private static final String REFERENCEABLE_NODE = "\"referenceable\"";
     private static final String TEST_PATH = '/' + TEST_NODE;
     private static final String TEST_TYPE = "mix:test";
-    public static final int TIME_OUT = 60;
+    public static final int TIME_OUT = 6;
 
     private Session observingSession;
     private ObservationManager observationManager;
@@ -431,7 +431,25 @@
             observationManager.removeEventListener(listener);
         }
     }
+    
+    @Test
+    public void propertyFilter() throws Exception {
+        Node root = getNode("/");
+        ExpectationListener listener = new ExpectationListener();
+        observationManager.addEventListener(listener, PROPERTY_ADDED, "/a/b", false, null, null, false);
+        Node a = root.addNode("a");
+        Node b = a.addNode("b");
+        listener.expect("/a/b/jcr:primaryType", PROPERTY_ADDED);
 
+        listener.expectAdd(b.setProperty("propName", 1));
+    	root.getSession().save();
+
+    	List<Expectation> missing = listener.getMissing(TIME_OUT, TimeUnit.SECONDS);
+        assertTrue("Missing events: " + missing, missing.isEmpty());
+        List<Event> unexpected = listener.getUnexpected();
+        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+    }
+    
     @Test
     public void pathFilter() throws Exception {
         final String path = "/events/only/here";
