Index: oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java	(revision 1761440)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java	(working copy)
@@ -27,6 +27,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -130,6 +131,8 @@
 
     private final NodeStore store;
     
+    private final List<Observable> observables = new LinkedList<Observable>();
+    
     private final List<RepositoryInitializer> initializers = newArrayList();
 
     private QueryEngineSettings queryEngineSettings = new QueryEngineSettings();
@@ -272,7 +275,9 @@
                     }
                 }
             } else if (type == Observer.class && store instanceof Observable) {
-                observerSubscription.register(((Observable) store).addObserver((Observer) service));
+                for (Observable observable : observables) {
+                    observerSubscription.register(observable.addObserver((Observer) service));
+                }
             }
 
             ObjectName objectName = null;
@@ -338,6 +343,9 @@
 
     public Oak(NodeStore store) {
         this.store = checkNotNull(store);
+        if (store instanceof Observable) {
+            this.observables.add((Observable) store);
+        }
     }
 
     public Oak() {
@@ -449,6 +457,9 @@
     @Nonnull
     public Oak with(@Nonnull EditorProvider provider) {
         editorProviders.add(checkNotNull(provider));
+        if (provider instanceof Observable) {
+            this.observables.add((Observable) provider);
+        }
         return this;
     }
 
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java	(revision 1761440)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/EventGenerator.java	(working copy)
@@ -119,7 +119,60 @@
                     c.counter);
         }
     }
+    
+    public static List<String[]> listReorderedChildNodeNames(PropertyState before, PropertyState after) {
+        // list the child node names before and after the change
+        List<String> beforeNames =
+                newArrayList(before.getValue(NAMES));
+        List<String> afterNames =
+                newArrayList(after.getValue(NAMES));
 
+        // check only those names that weren't added or removed
+        beforeNames.retainAll(newHashSet(afterNames));
+        afterNames.retainAll(newHashSet(beforeNames));
+
+        List<String[]> result = null;
+        // Selection sort beforeNames into afterNames,
+        // recording the swaps as we go
+        for (int a = 0; a < afterNames.size() - 1; a++) {
+            String beforeName = beforeNames.get(a);
+            String afterName = afterNames.get(a);
+            if (!afterName.equals(beforeName)) {
+                // Find afterName in the beforeNames list.
+                // This loop is guaranteed to stop because both
+                // lists contain the same names and we've already
+                // processed all previous names.
+                int b = a + 1;
+                while (!afterName.equals(beforeNames.get(b))) {
+                    b++;
+                }
+
+                // Swap the non-matching before name forward.
+                // No need to beforeNames.set(a, afterName),
+                // as we won't look back there anymore.
+                beforeNames.set(b, beforeName);
+
+                // find the destName of the orderBefore operation
+                String destName = null;
+                Iterator<String> iterator =
+                        after.getValue(NAMES).iterator();
+                while (destName == null && iterator.hasNext()) {
+                    if (afterName.equals(iterator.next())) {
+                        if (iterator.hasNext()) {
+                            destName = iterator.next();
+                        }
+                    }
+                }
+
+                if (result == null) {
+                    result = new LinkedList<String[]>();
+                }
+                result.add(new String[] {destName, afterName});
+            }
+        }
+        return result;
+    }
+
     private class Continuation implements NodeStateDiff, Runnable {
 
         /**
@@ -195,51 +248,14 @@
                 // check for reordering of child nodes
                 if (OAK_CHILD_ORDER.equals(before.getName())) {
                     // list the child node names before and after the change
-                    List<String> beforeNames =
-                            newArrayList(before.getValue(NAMES));
-                    List<String> afterNames =
-                            newArrayList(after.getValue(NAMES));
-
-                    // check only those names that weren't added or removed
-                    beforeNames.retainAll(newHashSet(afterNames));
-                    afterNames.retainAll(newHashSet(beforeNames));
-
-                    // Selection sort beforeNames into afterNames,
-                    // recording the swaps as we go
-                    for (int a = 0; a < afterNames.size() - 1; a++) {
-                        String beforeName = beforeNames.get(a);
-                        String afterName = afterNames.get(a);
-                        if (!afterName.equals(beforeName)) {
-                            // Find afterName in the beforeNames list.
-                            // This loop is guaranteed to stop because both
-                            // lists contain the same names and we've already
-                            // processed all previous names.
-                            int b = a + 1;
-                            while (!afterName.equals(beforeNames.get(b))) {
-                                b++;
-                            }
-
-                            // Swap the non-matching before name forward.
-                            // No need to beforeNames.set(a, afterName),
-                            // as we won't look back there anymore.
-                            beforeNames.set(b, beforeName);
-
-                            // find the destName of the orderBefore operation
-                            String destName = null;
-                            Iterator<String> iterator =
-                                    after.getValue(NAMES).iterator();
-                            while (destName == null && iterator.hasNext()) {
-                                if (afterName.equals(iterator.next())) {
-                                    if (iterator.hasNext()) {
-                                        destName = iterator.next();
-                                    }
-                                }
-                            }
-
+                    
+                    List<String[]> list = listReorderedChildNodeNames(before, after);
+                    if (list != null) {
+                        for (String[] childNames : list) {
                             // deliver the reordering event
                             handler.nodeReordered(
-                                    destName, afterName,
-                                    this.after.getChildNode(afterName));
+                                    childNames[0], childNames[1],
+                                    this.after.getChildNode(childNames[1]));
                         }
                     }
                 }
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java	(revision 1761440)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java	(working copy)
@@ -27,6 +27,8 @@
 
 import java.io.Closeable;
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
@@ -34,12 +36,13 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import com.google.common.base.Predicate;
 import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Predicate;
+
 /**
  * An observer that uses a change queue and a background thread to forward
  * content changes to another observer. The mechanism is designed so that
@@ -58,7 +61,7 @@
     /**
      * Signal for the background thread to stop processing changes.
      */
-    private static final ContentChange STOP = new ContentChange(null, null);
+    private static final ContentChange STOP = new ContentChange(null, null, null);
 
     /**
      * The receiving observer being notified off the background thread.
@@ -92,10 +95,12 @@
             Boolean.parseBoolean(System.getProperty("oak.observation.alwaysCollapseExternal", "false"));
 
     private static class ContentChange {
+        private final NodeState noopPreviousRoot;
         private final NodeState root;
         private final CommitInfo info;
         private final long created = System.currentTimeMillis();
-        ContentChange(NodeState root, CommitInfo info) {
+        ContentChange(NodeState noopPreviousRoot, NodeState root, CommitInfo info) {
+            this.noopPreviousRoot = noopPreviousRoot;
             this.root = root;
             this.info = info;
         }
@@ -107,6 +112,10 @@
      */
     private ContentChange last;
 
+    private NodeState previousRoot;
+
+    private boolean previousWasExcluded;
+
     /**
      * Flag to indicate that some content changes were dropped because
      * the queue was full.
@@ -129,6 +138,11 @@
                 try {
                     ContentChange change = queue.poll();
                     if (change != null && change != STOP) {
+                        if (change.noopPreviousRoot != null) {
+                            // a ContentChange that carries a noopPreviousRoot
+                            // indicates a NOOP change
+                            observer.contentChanged(change.noopPreviousRoot, CommitInfo.NOOP_CHANGE);
+                        }
                         observer.contentChanged(change.root, change.info);
                         removed(queue.size(), change.created);
                         currentTask.onComplete(completionHandler);
@@ -182,6 +196,10 @@
         this(observer, executor, 1000);
     }
 
+    protected Observer getObserver() {
+        return observer;
+    }
+    
     /**
      * Called when ever an item has been added to the queue
      * @param queueSize  size of the queue
@@ -272,7 +290,7 @@
         checkState(!stopped);
         checkNotNull(root);
 
-        if (alwaysCollapseExternalEvents && info == null && last != null && last.info == null) {
+        if (!previousWasExcluded && alwaysCollapseExternalEvents && info == null && last != null && last.info == null) {
             // This is an external change. If the previous change was
             // also external, we can drop it from the queue (since external
             // changes in any case can cover multiple commits) to help
@@ -281,35 +299,102 @@
             full = false;
         }
 
-        ContentChange change;
+        final boolean excludedByPrefiltering = isExcludedByPrefiltering(info);
+        final ContentChange change;
+        final NodeState noopPreviousRoot;
+        if (previousWasExcluded) {
+            // skippedPreviousRoot != null indicates a 'NOOP_CHANGE'
+            noopPreviousRoot = previousRoot;
+        } else {
+            // skippedPreviousRoot == null indicates a normal ContentChange
+            noopPreviousRoot = null;
+        }
         if (full) {
             // If the queue is full, some commits have already been skipped
             // so we need to drop the possible local commit information as
             // only external changes can be merged together to larger chunks.
-            change = new ContentChange(root, null);
+            change = new ContentChange(noopPreviousRoot, root, null);
         } else {
-            change = new ContentChange(root, info);
+            change = new ContentChange(noopPreviousRoot, root, info);
         }
 
-        // Try to add this change to the queue without blocking, and
-        // mark the queue as full if there wasn't enough space
-        full = !queue.offer(change);
+        if (!excludedByPrefiltering) {
+            // Try to add this change to the queue without blocking, and
+            // mark the queue as full if there wasn't enough space
+            full = !queue.offer(change);
+    
+            if (!full) {
+                // Keep track of the last change added, so we can do the
+                // compacting of external changes shown above.
+                last = change;
 
-        if (!full) {
-            // Keep track of the last change added, so we can do the
-            // compacting of external changes shown above.
-            last = change;
+                // we passed the previousRoot flag via noopPreviousRoot
+                // in ContentChange to the queue, so we can now reset
+                // the flag here
+                previousWasExcluded = false;
+                previousRoot = null;
+            } else {
+                // if the queue is full now, then we must not update
+                // the previousRoot and previousWasExcluded, as they
+                // must be handled by being added as noopPreviousRoot
+                // in ContentChange to the queue - which they currently
+                // can't - so leave them as is
+            }
+
+            // Set the completion handler on the currently running task. Multiple calls
+            // to onComplete are not a problem here since we always pass the same value.
+            // Thus there is no question as to which of the handlers will effectively run.
+            currentTask.onComplete(completionHandler);
+            added(queue.size());
+        } else {
+            previousWasExcluded = true;
+            previousRoot = root;
         }
-
-        // Set the completion handler on the currently running task. Multiple calls
-        // to onComplete are not a problem here since we always pass the same value.
-        // Thus there is no question as to which of the handlers will effectively run.
-        currentTask.onComplete(completionHandler);
-        added(queue.size());
     }
 
     //------------------------------------------------------------< internal >---
 
+    private boolean isExcludedByPrefiltering(CommitInfo info) {
+        if (!ObservationFilterValidatorProvider.prefilteringEnabled) {
+            return false;
+        }
+        if (info == null) {
+            return false;
+        }
+        Map<String, Object> m = info.getInfo();
+        if (m == null) {
+            return false;
+        }
+        CommitContext commitAttributes = (CommitContext)m.get(CommitContext.NAME);
+        if (commitAttributes == null) {
+            return false;
+        }
+        
+        // if the 'oak.isFiltered' flag is set in the 'oak.commitAttributes'
+        // we skip this entry in the ChangeProcessor.
+        // this results in a 'skip' of changes and ensures the 
+        // 'previousRoot' is again set correctly (see below in else)
+        Boolean observerFiltersEvaluated = (Boolean)commitAttributes.get(
+                ObservationFilterValidatorProvider.COMMIT_CONTEXT_PROPERTY_OBSERVER_FILTERS_EVALUATED);
+        if (observerFiltersEvaluated == null || !observerFiltersEvaluated) {
+            return false;
+        }
+        
+        // then this change (root,info) went through pre-filtering
+        // (ObservationFilterValidatorProvider) and in that case
+        // we should evaluate the COMMIT_CONTEXT_PROPERTY_INTERESTED_OBSERVERS set
+        @SuppressWarnings("unchecked")
+        Set<Observer> interestedObservers = (Set<Observer>)commitAttributes.get(
+                ObservationFilterValidatorProvider.COMMIT_CONTEXT_PROPERTY_INTERESTED_OBSERVERS);
+        if (interestedObservers != null) {
+            return !interestedObservers.contains(observer);
+        } else {
+            // if the set is not at all set, then there is no
+            // observer interested in this event at all, that's good.
+            return true;
+        }
+    }
+    
     private static Logger getLogger(@Nonnull Observer observer) {
         return LoggerFactory.getLogger(checkNotNull(observer).getClass());
     }
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java	(revision 1761440)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java	(working copy)
@@ -45,6 +45,15 @@
     public static final CommitInfo EMPTY =
             new CommitInfo(OAK_UNKNOWN, OAK_UNKNOWN);
 
+    /**
+     * OAK-4796: commitInfo object representing a 'no op ie filtered change'. 
+     * Used to indicate Observers that a particular contentChanged call 
+     * should be ignored (but nevertheless the call is made, in particular
+     * to allow Observers to take note of the new root)
+     */
+    public static final CommitInfo NOOP_CHANGE =
+            new CommitInfo(OAK_UNKNOWN, OAK_UNKNOWN);
+    
     private final String sessionId;
 
     private final String userId;
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ObservationFilterValidatorProvider.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ObservationFilterValidatorProvider.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ObservationFilterValidatorProvider.java	(working copy)
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(immediate = true)
+@Property(name = "type", value = "observationFilterValidator", propertyPrivate = true)
+@Service(ValidatorProvider.class)
+public class ObservationFilterValidatorProvider extends ValidatorProvider implements Observable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ObservationFilterValidatorProvider.class);
+
+    /** flag on a CommitInfo indicating that on this commit the pre-filtering by observers was evaluated */
+    static final String COMMIT_CONTEXT_PROPERTY_OBSERVER_FILTERS_EVALUATED = "oak.observation.observerFiltersEvaluated";
+
+    /** if 'evaluated' flag is set, this set contains the list of interested observers, all others have nothing included by their filter */
+    static final String COMMIT_CONTEXT_PROPERTY_INTERESTED_OBSERVERS = "oak.observation.interestedObservers";
+
+    static final boolean prefilteringEnabled;
+    static final boolean prefilteringTestEnabled;
+
+    static {
+        final String prefilteringEnabledStr = System.getProperty("oak.observation.prefilteringEnabled");
+        boolean prefilteringEnabledBool = true; // default is enabled==false
+        try {
+            if (prefilteringEnabledStr != null && prefilteringEnabledStr.length() != 0) {
+                prefilteringEnabledBool = Boolean.parseBoolean(prefilteringEnabledStr);
+                LOG.info("<clinit> using oak.prefilteringEnabled = " + prefilteringEnabledBool);
+            }
+        } catch(RuntimeException e) {
+            LOG.warn("<clinit> could not parse oak.prefilteringEnabled, using default (" + prefilteringEnabledBool + "): " + e, e);
+        }
+        prefilteringEnabled = prefilteringEnabledBool;
+
+        final String prefilteringTestEnabledStr = System.getProperty("oak.observation.prefilteringTestEnabled");
+        boolean prefilteringTestEnabledBool = false; // default is enabled==true
+        try {
+            if (prefilteringTestEnabledStr != null && prefilteringTestEnabledStr.length() != 0) {
+                prefilteringTestEnabledBool = Boolean.parseBoolean(prefilteringTestEnabledStr);
+                LOG.info("<clinit> using oak.prefilteringTestEnabled = " + prefilteringEnabledBool);
+            }
+        } catch(RuntimeException e) {
+            LOG.warn("<clinit> could not parse oak.prefilteringTestEnabled, using default (" + prefilteringTestEnabledBool + "): " + e, e);
+        }
+        prefilteringTestEnabled = prefilteringTestEnabledBool;
+    }
+
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, policy = ReferencePolicy.DYNAMIC, referenceInterface = Observer.class)
+    private Observer[] observers = new Observer[0];
+    
+    private Set<ObserverValidatorProvider> validatingObservers = new HashSet<ObserverValidatorProvider>();
+    
+    private final Object bindSyncObj = new Object();
+    
+    private final class CompositeValidator implements Validator {
+
+        private final List<Validator> validators;
+        private final CommitInfo infoOrNull;
+
+        CompositeValidator(List<Validator> validators, CommitInfo infoOrNull) {
+            this.validators = validators;
+            this.infoOrNull = infoOrNull;
+        }
+        
+        @Override
+        public void enter(NodeState before, NodeState after) throws CommitFailedException {
+            // first thing, if this has the CommitInfo set (which only the root has),
+            // then set the flag COMMIT_CONTEXT_PROPERTY_OBSERVER_FILTERED on it.
+            // this flag marks the CommitInfo as 'having been pre-filtered'
+            // and that the set COMMIT_CONTEXT_PROPERTY_INTERESTED_OBSERVERS should
+            // be evaluated by down-stream observers (ie BackgroundObserver).
+            if (infoOrNull != null) {
+                CommitContext commitContext = (CommitContext) infoOrNull.getInfo().get(CommitContext.NAME);
+                commitContext.set(COMMIT_CONTEXT_PROPERTY_OBSERVER_FILTERS_EVALUATED, Boolean.TRUE);
+                LOG.info("enter: evaluating filters");
+            }
+            for (Validator validator : validators) {
+                validator.enter(before, after);
+            }
+        }
+
+        @Override
+        public void leave(NodeState before, NodeState after) throws CommitFailedException {
+            for (Validator validator : validators) {
+                validator.leave(before, after);
+            }
+        }
+
+        @Override
+        public void propertyAdded(PropertyState after) throws CommitFailedException {
+            for (Validator validator : validators) {
+                validator.propertyAdded(after);
+            }
+        }
+
+        @Override
+        public void propertyChanged(PropertyState before, PropertyState after) throws CommitFailedException {
+            for (Validator validator : validators) {
+                validator.propertyChanged(before, after);
+            }
+        }
+
+        @Override
+        public void propertyDeleted(PropertyState before) throws CommitFailedException {
+            for (Validator validator : validators) {
+                validator.propertyDeleted(before);
+            }
+        }
+
+        @Override
+        public Validator childNodeAdded(String name, NodeState after) throws CommitFailedException {
+            List<Validator> childValidators = null;
+            for (Validator validator : validators) {
+                Validator aChildValidator = validator.childNodeAdded(name, after);
+                if (aChildValidator != null) {
+                    if (childValidators == null) {
+                        childValidators = new LinkedList<Validator>();
+                    }
+                    childValidators.add(aChildValidator);
+                }
+            }
+            if (childValidators == null) {
+                // very good, we can stop
+                return null;
+            }
+            return new CompositeValidator(childValidators, null);
+        }
+
+        @Override
+        public Validator childNodeChanged(String name, NodeState before, NodeState after) throws CommitFailedException {
+            List<Validator> childValidators = null;
+            for (Validator validator : validators) {
+                Validator aChildValidator = validator.childNodeChanged(name, before, after);
+                if (aChildValidator != null) {
+                    if (childValidators == null) {
+                        childValidators = new LinkedList<Validator>();
+                    }
+                    childValidators.add(aChildValidator);
+                }
+            }
+            if (childValidators == null) {
+                // very good, we can stop
+                return null;
+            }
+            return new CompositeValidator(childValidators, null);
+        }
+
+        @Override
+        public Validator childNodeDeleted(String name, NodeState before) throws CommitFailedException {
+            List<Validator> childValidators = null;
+            for (Validator validator : validators) {
+                Validator aChildValidator = validator.childNodeDeleted(name, before);
+                if (aChildValidator != null) {
+                    if (childValidators == null) {
+                        childValidators = new LinkedList<Validator>();
+                    }
+                    childValidators.add(aChildValidator);
+                }
+            }
+            if (childValidators == null) {
+                // very good, we can stop
+                return null;
+            }
+            return new CompositeValidator(childValidators, null);
+        }
+        
+    }
+    
+    /** test variant going via Jcr.whiteboard */
+    @Override
+    public Closeable addObserver(final Observer observer) {
+        bindObserver(observer);
+        return new Closeable() {
+            
+            @Override
+            public void close() throws IOException {
+                unbindObserver(observer);
+            }
+        };
+    }
+
+    /** runtime variant going via OSGi */
+    protected void bindObserver(final Observer observer) {
+        LOG.info("bindObserver: "+observer);
+        synchronized(bindSyncObj) {
+            List<Observer> al = new LinkedList<Observer>(Arrays.asList(observers));
+            al.add(observer);
+            observers = al.toArray(new Observer[al.size()]);
+            if (observer instanceof ObserverValidatorProvider) {
+                validatingObservers.add((ObserverValidatorProvider) observer);
+            }
+        }
+    }
+    
+    /** runtime variant going via OSGi */
+    protected void unbindObserver(final Observer observer) {
+        LOG.info("unbindObserver: "+observer);
+        synchronized(bindSyncObj) {
+            List<Observer> al = new LinkedList<Observer>(Arrays.asList(observers));
+            al.remove(observer);
+            observers = al.toArray(new Observer[al.size()]);
+            if (observer instanceof ObserverValidatorProvider) {
+                validatingObservers.remove((ObserverValidatorProvider) observer);
+            }
+        }
+    }
+
+    @Override
+    protected Validator getRootValidator(NodeState before, NodeState after, CommitInfo info) {
+        if (!prefilteringEnabled && !prefilteringTestEnabled) {
+            // disabled
+            return null;
+        }
+        List<Validator> validators = null;
+        for (ObserverValidatorProvider filteringObserver : validatingObservers) {
+            Validator v = filteringObserver.getRootValidator(before, after, info);
+            if (v != null) {
+                if (validators == null) {
+                    validators = new LinkedList<Validator>();
+                }
+                validators.add(v);
+            }
+        }
+        if (validators == null) {
+            return null;
+        }
+        return new CompositeValidator(validators, info);
+            
+    }
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ObservationFilterValidatorProvider.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ObserverValidatorProvider.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ObserverValidatorProvider.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ObserverValidatorProvider.java	(working copy)
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+/**
+ * Specialization of an Observer which can also serve
+ * as a ValidatorProvider.
+ * <p>
+ * Note that this 'copies' the ValidatorProvider.getRootValidator
+ * instead of reusing it by subclassing since otherwise
+ * this would have to become a class vs an interface (the latter
+ * is preferred for easier use)
+ */
+public interface ObserverValidatorProvider extends Observer {
+
+    Validator getRootValidator(NodeState before, NodeState after, CommitInfo info);
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ObserverValidatorProvider.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserver.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserver.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserver.java	(working copy)
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import static org.apache.jackrabbit.oak.api.Type.STRING;
+import static org.apache.jackrabbit.oak.spi.state.MoveDetector.SOURCE_PATH;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.plugins.observation.EventGenerator;
+import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
+import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
+import org.apache.jackrabbit.oak.plugins.tree.impl.TreeConstants;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PrefilteringBackgroundObserver extends BackgroundObserver implements ObserverValidatorProvider {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PrefilteringBackgroundObserver.class);
+
+    private static final String COMMIT_CONTEXT_PROPERTY_TEST_INTERESTED_OBSERVERS = "oak.observation.testInterestedObservers";
+
+    private final AtomicReference<FilterProvider> filterProvider;
+
+    private final class FilteringValidator implements Validator {
+
+        private final FilteringValidator root;
+        private final CommitInfo commitInfo;
+        private boolean anyIncludes;
+        private final EventFilter filter;
+        private final int level;
+        
+        private NodeState afterOrNull;
+        
+        @Override
+        public String toString() {
+            return "FilteringValidator with filter="+filter+" for "+PrefilteringBackgroundObserver.this;
+        }
+
+        public FilteringValidator(EventFilter filter, FilteringValidator root, int level, NodeState afterOrNull) {
+            this.filter = filter;
+            this.commitInfo = root.commitInfo;
+            this.root = root;
+            this.level = level;
+            this.afterOrNull = afterOrNull;
+        }
+
+        public FilteringValidator(EventFilter filter, CommitInfo info) {
+            this.filter = filter;
+            this.commitInfo = info;
+            this.root = this;
+            this.level = 0;
+        }
+
+        private final boolean hasAnyIncludes() {
+            return root.anyIncludes;
+        }
+        
+        private void setHasAnyIncludes(boolean hasAnyIncludes) {
+            root.anyIncludes = hasAnyIncludes;
+        }
+
+        private final void adjustCommitContextSet(final String setName, boolean add) {
+            // CommitContext should always be set on the CommitInfo in a Validator
+            CommitContext commitContext = (CommitContext) commitInfo.getInfo().get(CommitContext.NAME);
+            
+            @SuppressWarnings("unchecked")
+            Set<Observer> interestedObservers = (Set<Observer>) commitContext.get(setName);
+            if (interestedObservers == null) {
+                interestedObservers = new HashSet<Observer>();
+                commitContext.set(setName, interestedObservers);
+            }
+            if (add) {
+                interestedObservers.add(getObserver());
+            }
+        }
+
+        @Override
+        public void enter(NodeState before, NodeState after) throws CommitFailedException {
+            // nada
+        }
+
+        @Override
+        public void leave(NodeState before, NodeState after) throws CommitFailedException {
+            if (level == 0) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.info("leave: hasAnyIncludes: {}", hasAnyIncludes());
+                }
+                // before leaving from the root, lets evaluate the decision
+                if (ObservationFilterValidatorProvider.prefilteringEnabled) {
+                    // prefiltering is enabled
+                    if (hasAnyIncludes()) {
+                        adjustCommitContextSet(
+                                ObservationFilterValidatorProvider.COMMIT_CONTEXT_PROPERTY_INTERESTED_OBSERVERS, 
+                                true);
+                    }
+                }
+                if (ObservationFilterValidatorProvider.prefilteringTestEnabled) {
+                    // 'prefiltering test' is enabled
+                    // unlike for the real 'prefilteringEnabled' case, where the set is only
+                    // updated if there is any observer (to avoid creating a set if there are zero)
+                    // for the test variant this is not the case - in order to be able to 
+                    // distinguish the two cases later in the paranoia check
+                    adjustCommitContextSet(COMMIT_CONTEXT_PROPERTY_TEST_INTERESTED_OBSERVERS, hasAnyIncludes());
+                }
+            }
+        }
+
+        @Override
+        public void propertyAdded(PropertyState after) throws CommitFailedException {
+            if (hasAnyIncludes()) {
+                return;
+            }
+            if (filter.includeAdd(after)) {
+                setHasAnyIncludes(true);
+            }
+        }
+
+        @Override
+        public void propertyChanged(PropertyState before, PropertyState after) throws CommitFailedException {
+            if (hasAnyIncludes()) {
+                return;
+            }
+            if (filter.includeChange(before, after)) {
+                setHasAnyIncludes(true);
+            }
+            if (TreeConstants.OAK_CHILD_ORDER.equals(before.getName())) {
+                if (afterOrNull == null) {
+                    // this should not occur, as :childOrder should only be 
+                    // happening within a parent' childNodeChanged,
+                    // in which case the afterOrNull would be set.
+                    // if that would be still the case, then we 
+                    // have a potentially false positive - however, that's fine.
+                    setHasAnyIncludes(true);
+                    return;
+                }
+                
+                List<String[]> list = EventGenerator.listReorderedChildNodeNames(before, after);
+                if (list != null) {
+                    for (String[] childNames : list) {
+                        // ask the filter if it is interested in this reorder,
+                        // if so, mark as 'has any includes' and done we are
+                        if (filter.includeReorder(
+                                childNames[0], childNames[1],
+                                afterOrNull.getChildNode(childNames[1]))) {
+                            setHasAnyIncludes(true);
+                            return;
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void propertyDeleted(PropertyState before) throws CommitFailedException {
+            if (hasAnyIncludes()) {
+                return;
+            }
+            if (filter.includeDelete(before)) {
+                setHasAnyIncludes(true);
+            }
+        }
+
+        private final Validator childNodeModification(String name, NodeState before, NodeState after, boolean isChange) {
+            EventFilter childFilter = filter.create(name, before, after);
+            if (childFilter == null) {
+                // wohoo, we can stop
+                return null;
+            } else {
+                return new FilteringValidator(childFilter, root, level + 1, isChange ? after : null);
+            }
+        }
+        
+        @Override
+        public Validator childNodeAdded(String name, NodeState after) throws CommitFailedException {
+            if (hasAnyIncludes()) {
+                // make sure we dont dive deeper if we've already found an include
+                return null;
+            }
+            PropertyState sourceProperty = after.getProperty(SOURCE_PATH);
+            if (sourceProperty != null) {
+                String sourcePath = sourceProperty.getValue(STRING);
+                if (filter.includeMove(sourcePath, name, after)) {
+                    setHasAnyIncludes(true);
+                    return null;
+                }
+            }
+            if (filter.includeAdd(name, after)) {
+                setHasAnyIncludes(true);
+                return null;
+            }
+            return childNodeModification(name, EmptyNodeState.EMPTY_NODE, after, false);
+        }
+
+        @Override
+        public Validator childNodeChanged(String name, NodeState before, NodeState after) throws CommitFailedException {
+            if (hasAnyIncludes()) {
+                // make sure we dont dive deeper if we've already found an include
+                return null;
+            }
+            return childNodeModification(name, before, after, true);
+        }
+
+        @Override
+        public Validator childNodeDeleted(String name, NodeState before) throws CommitFailedException {
+            if (hasAnyIncludes()) {
+                // make sure we dont dive deeper if we've already found an include
+                return null;
+            }
+            if (filter.includeDelete(name, before)) {
+                setHasAnyIncludes(true);
+                return null;
+            }
+            //TODO: not sure if the below is really necessary of if we can't just stop here with a return null
+            return childNodeModification(name, before, EmptyNodeState.EMPTY_NODE, false);
+        }
+        
+    }
+
+    /**
+     * TODO: remove this test method after confirmation of further integration testing
+     */
+    public static final Boolean wouldBeExcludedByPrefiltering(CommitInfo info, Observer prefilteringObserver) {
+        // if the 'oak.testInterestedObservers' Set is set in the 'oak.commitAttributes'
+        // we would have skipped this entry in the ChangeProcessor.
+        // the result of this method is to do a paranoia check if that would have been correct
+        if (info == null) {
+            return null;
+        }
+        Map<String, Object> m = info.getInfo();
+        if (m == null) {
+            return null;
+        }
+        CommitContext commitAttributes = (CommitContext)m.get(CommitContext.NAME);
+        if (commitAttributes == null) {
+            return null;
+        }
+        Boolean filtersEvaluatedBool = (Boolean)commitAttributes.get(
+                ObservationFilterValidatorProvider.COMMIT_CONTEXT_PROPERTY_OBSERVER_FILTERS_EVALUATED);
+        if (filtersEvaluatedBool == null) {
+            return null;
+        }
+        @SuppressWarnings("unchecked")
+        Set<Observer> testInterestedObservers = (Set<Observer>)commitAttributes.get(
+                PrefilteringBackgroundObserver.COMMIT_CONTEXT_PROPERTY_TEST_INTERESTED_OBSERVERS);
+        if (testInterestedObservers != null) {
+            return !testInterestedObservers.contains(prefilteringObserver);
+        } else {
+            // then the test mechanism is disabled
+            return null;
+        }
+    }
+
+    protected PrefilteringBackgroundObserver(Observer observer, Executor executor, int queueLength,
+            AtomicReference<FilterProvider> filterProvider) {
+        super(observer, executor, queueLength);
+        this.filterProvider = filterProvider;
+    }
+
+    @Override
+    public Validator getRootValidator(NodeState before, NodeState after, CommitInfo info) {
+        // NOTE that the PrefilteringBackgroundObserver can't do internal/external event
+        // filtering - as it doesn't have the userId. That must be done in a subclass!
+        return new FilteringValidator(filterProvider.get().getFilter(before, after), info);
+    }
+    
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserver.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java	(revision 0)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java	(working copy)
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.oak.core.SimpleCommitContext;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.Test;
+
+public class PrefilteringBackgroundObserverTest {
+    
+    private List<Runnable> runnableQ;
+    private ExecutorService executor;
+    private CompositeObserver compositeObserver;
+    private List<ContentChanged> received;
+    private EnqueuingObserver enqueuingObserver;
+    private BackgroundObserver backgroundObserver;
+    private Map<String,Object> filtersEvaluatedMapWithNullObservers;
+    private Map<String,Object> filtersEvaluatedMapWithEmptyObservers;
+    private Map<String,Object> filtersEvaluatedMapWithNonEmptyObservers;
+
+    public void init(int queueLength) throws Exception {
+        runnableQ = new LinkedList<Runnable>();
+        executor = new EnqueuingExecutorService(runnableQ);
+        compositeObserver = new CompositeObserver();
+        received = new LinkedList<ContentChanged>();
+        enqueuingObserver = new EnqueuingObserver(received);
+
+        filtersEvaluatedMapWithNullObservers = new HashMap<>();
+        filtersEvaluatedMapWithNullObservers.put(ObservationFilterValidatorProvider.COMMIT_CONTEXT_PROPERTY_OBSERVER_FILTERS_EVALUATED, Boolean.TRUE);
+
+        filtersEvaluatedMapWithEmptyObservers = new HashMap<>();
+        filtersEvaluatedMapWithEmptyObservers.put(ObservationFilterValidatorProvider.COMMIT_CONTEXT_PROPERTY_OBSERVER_FILTERS_EVALUATED, Boolean.TRUE);
+        final Set<Observer> emptyInterestedObservers = new HashSet<Observer>();
+        filtersEvaluatedMapWithEmptyObservers.put(ObservationFilterValidatorProvider.COMMIT_CONTEXT_PROPERTY_INTERESTED_OBSERVERS, emptyInterestedObservers);
+        
+        filtersEvaluatedMapWithNonEmptyObservers = new HashMap<>();
+        filtersEvaluatedMapWithNonEmptyObservers.put(ObservationFilterValidatorProvider.COMMIT_CONTEXT_PROPERTY_OBSERVER_FILTERS_EVALUATED, Boolean.TRUE);
+        final Set<Observer> nonEmptyInterestedObservers = new HashSet<Observer>();
+        filtersEvaluatedMapWithNonEmptyObservers.put(ObservationFilterValidatorProvider.COMMIT_CONTEXT_PROPERTY_INTERESTED_OBSERVERS, nonEmptyInterestedObservers);
+
+        backgroundObserver = new BackgroundObserver(enqueuingObserver, executor, queueLength);
+        compositeObserver.addObserver(backgroundObserver);
+        nonEmptyInterestedObservers.add(enqueuingObserver);
+    }
+
+    private final class EnqueuingObserver implements Observer {
+        private final List<ContentChanged> received;
+
+        private EnqueuingObserver(List<ContentChanged> received) {
+            this.received = received;
+        }
+
+        @Override
+        public void contentChanged(@Nonnull final NodeState root, @Nullable CommitInfo info) {
+            received.add(new ContentChanged(root, info));
+        }
+    }
+
+    private final class EnqueuingExecutorService extends AbstractExecutorService {
+        private final List<Runnable> runnableQ;
+
+        private EnqueuingExecutorService(List<Runnable> runnableQ) {
+            this.runnableQ = runnableQ;
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            runnableQ.add(command);
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            throw new IllegalStateException("nyi");
+        }
+
+        @Override
+        public void shutdown() {
+            throw new IllegalStateException("nyi");
+        }
+
+        @Override
+        public boolean isTerminated() {
+            throw new IllegalStateException("nyi");
+        }
+
+        @Override
+        public boolean isShutdown() {
+            throw new IllegalStateException("nyi");
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            throw new IllegalStateException("nyi");
+        }
+    }
+
+    class ContentChanged {
+        NodeState root;
+        CommitInfo info;
+        ContentChanged(NodeState root, CommitInfo info) {
+            this.root = root;
+            this.info = info;
+        }
+    }
+    
+    private static CommitInfo newCommitInfo(String userId) {
+        return new CommitInfo("no-session", userId);
+    }
+
+    private static CommitInfo newCommitInfo(String userId, Map<String,Object> commitAttributesMap) {
+        if (commitAttributesMap == null) {
+            return newCommitInfo(userId);
+        }
+        CommitContext cc = new SimpleCommitContext();
+        for (String aKey : commitAttributesMap.keySet()) {
+            Object value = commitAttributesMap.get(aKey);
+            cc.set(aKey, value);
+        }
+        Map<String,Object> infoMap = new HashMap<String,Object>();
+        infoMap.put(CommitContext.NAME, cc);
+        return new CommitInfo("no-session", userId, infoMap);
+    }
+    
+    private static void executeRunnables(final List<Runnable> runnableQ, int num) {
+        for(int i=0; i<num; i++) {
+            for (Runnable runnable : new ArrayList<Runnable>(runnableQ)) {
+                runnable.run();
+            }
+        }
+    }
+
+    private static NodeState p(int k) {
+        return EMPTY_NODE.builder().setProperty("p", k).getNodeState();
+    }
+
+    @Test
+    public void testFlipping() throws Exception {
+        final int queueLength = 2000;
+        init(queueLength);
+
+        // initialize observer with an initial contentChanged
+        // (see ChangeDispatcher#addObserver)
+        {
+            compositeObserver.contentChanged(p(-1), null);
+        }
+        // Part 1 : first run with filtersEvaluatedMapWithEmptyObservers - empty or null shouldn't matter, it's excluded in both cases
+        for (int k = 0; k < 1000; k++) {
+            CommitInfo info;
+            if (k%2==1) {
+                info = newCommitInfo("user-"+k);
+            } else {
+                info = newCommitInfo("user-"+k, filtersEvaluatedMapWithEmptyObservers);
+            }
+            final NodeState p = p(k);
+            compositeObserver.contentChanged(p, info);
+            if (k%10 == 0) {
+                executeRunnables(runnableQ, 10);
+            }
+        }
+        executeRunnables(runnableQ, 10);
+        
+        assertEquals(1001, received.size());
+        assertEquals(500, countNoops(received));
+        
+        // Part 2 : run with filtersEvaluatedMapWithNullObservers - empty or null shouldn't matter, it's excluded in both cases
+        received.clear();
+        for (int k = 0; k < 1000; k++) {
+            CommitInfo info;
+            if (k%2==1) {
+                info = newCommitInfo("user-"+k);
+            } else {
+                info = newCommitInfo("user-"+k, filtersEvaluatedMapWithEmptyObservers);
+            }
+            final NodeState p = p(k);
+            compositeObserver.contentChanged(p, info);
+            if (k%10 == 0) {
+                executeRunnables(runnableQ, 10);
+            }
+        }
+        executeRunnables(runnableQ, 10);
+        
+        assertEquals(1000, received.size());
+        assertEquals(500, countNoops(received));
+        
+        // Part 3 : unlike the method name suggests, this variant tests with the filter disabled, so should receive all events normally
+        received.clear();
+        for (int k = 0; k < 1000; k++) {
+            CommitInfo info;
+            if (k%2==1) {
+                info = newCommitInfo("user-"+k);
+            } else {
+                info = newCommitInfo("user-"+k, filtersEvaluatedMapWithNonEmptyObservers);
+            }
+            final NodeState p = p(k);
+            compositeObserver.contentChanged(p, info);
+            if (k%10 == 0) {
+                executeRunnables(runnableQ, 10);
+            }
+        }
+        executeRunnables(runnableQ, 10);
+        
+        assertEquals(1000, received.size());
+        assertEquals(0, countNoops(received));
+    }
+
+    private int countNoops(List<ContentChanged> received) {
+        int noopCnt = 0;
+        for (ContentChanged contentChanged : received) {
+            if (contentChanged.info == CommitInfo.NOOP_CHANGE) {
+                noopCnt++;
+            }
+        }
+        return noopCnt;
+    }
+
+    @Test
+    public void testFlipping2() throws Exception {
+        doTestFullQueue(6, 
+                new TestPattern(false, 1, true, 1, 0),
+                new TestPattern(true, 5, true, 0, 0),
+                new TestPattern(false, 2, true, 2, 1),
+                new TestPattern(true, 1, true, 0, 0),
+                new TestPattern(false, 2, true, 2, 1));
+    }
+    
+    @Test
+    public void testQueueNotFull() throws Exception {
+        doTestFullQueue(20, 
+                // start: empty queue
+                new TestPattern(true, 1000, false, 0, 0),
+                // here: still empty, just the previousRoot is set to remember above NOOPs
+                new TestPattern(false, 5, false, 0, 0),
+                // here: 5 changes are in the queue, the queue fits 20, way to go
+                new TestPattern(true, 500, false, 0, 0),
+                // still 5 in the queue
+                new TestPattern(false, 5, false, 0, 0),
+                // now we added 2, queue still not full
+                new TestPattern(true, 0 /* only flush*/, true, 10, 2)
+                );
+    }
+    
+    @Test
+    public void testIncludeOnQueueFull() throws Exception {
+        doTestFullQueue(6, 
+                // start: empty queue
+                new TestPattern(true, 1000, false, 0, 0),
+                // here: still empty, just the previousRoot is set to remember above NOOPs
+                new TestPattern(false, 5, false, 0, 0),
+                // here: 5 changes are in the queue, the queue fits 6, so almost full
+                new TestPattern(true, 500, false, 0, 0),
+                // still 5 in the queue, of 6
+                new TestPattern(false, 5, false, 0, 0),
+                // now we added 2, so the queue got full
+                new TestPattern(true, 0 /* only flush*/, true, 6, 2)
+                );
+    }
+
+    @Test
+    public void testExcludeOnQueueFull() throws Exception {
+        doTestFullQueue(3, 
+                // start: empty queue
+                new TestPattern(true, 1, false, 0, 0),
+                // here: still empty, just the previousRoot is set to remember above NOOPs
+                new TestPattern(false, 3, false, 0, 0),
+                // here: 3 changes are in the queue, the queue fits 3, so it just got full now
+                new TestPattern(true, 1, false, 0, 0),
+                // still full but it's ignored, so doesn't have any queue length effect
+                new TestPattern(false, 3, false, 0, 0),
+                // adding 3 will not work, it will result in an overflow entry
+                new TestPattern(true, 0 /* only flush*/, true, 3, 1), //TODO: we would expect 4+2=6, but apparently the overflow entry doesn't get flushed without adding a new one
+                new TestPattern(false, 1, false, 0, 0),
+                new TestPattern(true, 0 /* only flush*/, true, 1, 1)
+                );
+    }
+
+    class TestPattern {
+        final boolean flush;
+        final boolean excluded;
+        final int numEvents;
+        final int expectedNumEvents;
+        final int expectedNumNoops;
+        TestPattern(boolean excluded, int numEvents, boolean flush, int expectedNumEvents, int expectedNumNoops) {
+            this.flush = flush;
+            this.excluded = excluded;
+            this.numEvents = numEvents;
+            this.expectedNumEvents = expectedNumEvents;
+            this.expectedNumNoops = expectedNumNoops;
+        }
+        
+        @Override
+        public String toString() {
+            return "excluded="+excluded+", numEvents="+numEvents+", flush="+flush+", expectedNumEvents="+expectedNumEvents+", expectedNumNoops="+expectedNumNoops;
+        }
+    }
+    
+    private void doTestFullQueue(int queueLength, TestPattern... testPatterns) throws Exception {
+        init(queueLength);
+
+        // initialize observer with an initial contentChanged
+        // (see ChangeDispatcher#addObserver)
+        {
+            compositeObserver.contentChanged(p(-1), null);
+        }
+        // remove above first event right away
+        executeRunnables(runnableQ, 5);
+        received.clear();
+        
+        int k = 0;
+        int loopCnt = 0;
+        for (TestPattern testPattern : testPatterns) {
+            k++;
+            for(int i=0; i<testPattern.numEvents; i++) {
+                CommitInfo info;
+                if (!testPattern.excluded) {
+                    info = newCommitInfo("user-"+k);
+                } else {
+                    info = newCommitInfo("user-"+k, filtersEvaluatedMapWithEmptyObservers);
+                }
+                k++;
+                compositeObserver.contentChanged(p(k), info);
+            }
+            if (testPattern.flush) {
+                executeRunnables(runnableQ, testPattern.numEvents + testPattern.expectedNumEvents + testPattern.expectedNumNoops + 10);
+            }
+            assertEquals("loopCnt="+loopCnt, testPattern.expectedNumEvents + testPattern.expectedNumNoops, received.size());
+            int numNoops = countNoops(received);
+            assertEquals("loopCnt="+loopCnt, testPattern.expectedNumNoops, numNoops);
+            received.clear();
+            loopCnt++;
+        }
+    }
+
+}

Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java	(revision 1761440)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java	(working copy)
@@ -53,6 +53,7 @@
 import org.apache.jackrabbit.oak.spi.commit.CompositeConflictHandler;
 import org.apache.jackrabbit.oak.spi.commit.Editor;
 import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.commit.ObservationFilterValidatorProvider;
 import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.commit.PartialConflictHandler;
 import org.apache.jackrabbit.oak.spi.lifecycle.RepositoryInitializer;
@@ -120,6 +121,7 @@
             with(new NamespaceEditorProvider());
             with(new TypeEditorProvider());
             with(new ConflictValidatorProvider());
+            with(new ObservationFilterValidatorProvider());
 
             with(new ReferenceEditorProvider());
             with(new ReferenceIndexProvider());
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java	(revision 1761440)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java	(working copy)
@@ -53,6 +53,8 @@
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.commit.PrefilteringBackgroundObserver;
+import org.apache.jackrabbit.oak.spi.commit.Validator;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
@@ -89,6 +91,13 @@
      * kicks in.
      */
     public static final int MAX_DELAY;
+    
+    /**
+     * number of events in the observation queue after which prefiltering will
+     * become active - if fewer than this many events are in the queue, prefiltering
+     * is not applied.
+     */
+    public static final int PREFILTERING_LIMIT;
 
     // OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using System.properties for now
     static {
@@ -114,6 +123,18 @@
         }
         DELAY_THRESHOLD = delayThreshold;
         MAX_DELAY = maxDelay;
+
+        final String prefilteringLimitStr = System.getProperty("oak.observation.prefilteringLimit");
+        int prefilteringLimit = 20; /* default is 20 */
+        try{
+            if (prefilteringLimitStr != null && prefilteringLimitStr.length() != 0) {
+                prefilteringLimit = Integer.parseInt(prefilteringLimitStr);
+                LOG.info("<clinit> using oak.observation.prefilteringLimit of " + prefilteringLimit);
+            }
+        } catch(RuntimeException e) {
+            LOG.warn("<clinit> could not parse oak.observation.prefilteringLimitd, using default(" + prefilteringLimit + "): " + e, e);
+        }
+        PREFILTERING_LIMIT = prefilteringLimit;
     }
     
     private static final AtomicInteger COUNTER = new AtomicInteger();
@@ -185,16 +206,16 @@
         checkState(registration == null, "Change processor started already");
         final WhiteboardExecutor executor = new WhiteboardExecutor();
         executor.start(whiteboard);
-        final BackgroundObserver observer = createObserver(executor);
+        backgroundObserver = createObserver(executor);
         listenerId = COUNTER.incrementAndGet() + "";
         Map<String, String> attrs = ImmutableMap.of(LISTENER_ID, listenerId);
         String name = tracker.toString();
         registration = new CompositeRegistration(
-            registerObserver(whiteboard, observer),
+            registerObserver(whiteboard, backgroundObserver),
             registerMBean(whiteboard, EventListenerMBean.class,
                     tracker.getListenerMBean(), "EventListener", name, attrs),
             registerMBean(whiteboard, BackgroundObserverMBean.class,
-                    observer.getMBean(), BackgroundObserverMBean.TYPE, name, attrs),
+                    backgroundObserver.getMBean(), BackgroundObserverMBean.TYPE, name, attrs),
             //TODO If FilterProvider gets changed later then MBean would need to be
             // re-registered
             registerMBean(whiteboard, FilterConfigMBean.class,
@@ -202,7 +223,7 @@
             new Registration() {
                 @Override
                 public void unregister() {
-                    observer.close();
+                    backgroundObserver.close();
                 }
             },
             new Registration() {
@@ -221,14 +242,16 @@
     }
 
     private BackgroundObserver createObserver(final WhiteboardExecutor executor) {
-        return new BackgroundObserver(this, executor, queueLength) {
+        return new PrefilteringBackgroundObserver(this, executor, queueLength, filterProvider) {
             private volatile long delay;
             private volatile boolean blocking;
+            private volatile int lastQueueSize;
 
             @Override
             protected void added(int queueSize) {
                 maxQueueLength.recordValue(queueSize);
                 tracker.recordQueueLength(queueSize);
+                lastQueueSize = queueSize;
 
                 if (queueSize == queueLength) {
                     if (commitRateLimiter != null) {
@@ -278,12 +301,35 @@
             protected void removed(int queueSize, long created) {
                 maxQueueLength.recordValue(queueSize);
                 tracker.recordQueueLength(queueSize, created);
+                lastQueueSize = queueSize;
             }
+            
+            @Override
+            public String toString() {
+                return "PrefilteringBackgroundObserver for "+ChangeProcessor.this;
+            }
+            
+            @Override
+            public Validator getRootValidator(NodeState before, NodeState after, CommitInfo info) {
+                if (lastQueueSize < PREFILTERING_LIMIT) {
+                    // only do prefiltering if the queue is larger than a configurable limit
+                    // to keep commit cost low for the majority of commits, only add cost
+                    // when system is under pressure.
+                    return null;
+                }
+                if (!filterProvider.get().includeCommit(contentSession.toString(), info)) {
+                    // early filtering, perfect, that's the best!
+                    return null;
+                }
+                return super.getRootValidator(before, after, info);
+            }
+
         };
     }
 
     private final Monitor runningMonitor = new Monitor();
     private final RunningGuard running = new RunningGuard(runningMonitor);
+    private BackgroundObserver backgroundObserver;
 
     /**
      * Try to stop this change processor if running. This method will wait
@@ -333,10 +379,27 @@
 
     @Override
     public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
-        if (previousRoot != null) {
+        // OAK-4796 : if the CommitInfo is a NOOP, then we should skip
+        // this contentChanged and instead just remember the root as the new previousRoot
+        // The NOOP is set by the BackgroundObserver when it finds out
+        // that the ObserationFilterValidatorProvider (and its Validators)
+        // have figured out that this Observer's filter would not result in any event
+        final boolean noopChange = info == CommitInfo.NOOP_CHANGE;
+
+        // OAK-4796 testing mechanism: the pre-filtering is either activated
+        // or in test-mode - in which case instead of a NOOP, the normal 
+        // CommitInfo is passed, but it contains a CommitAttribute containing
+        // the interested providers - against which the real filter is then evaluated.
+        // So if the real filter finds an event, it checks if the pre-filtering 
+        // would have correctly detected this - and vice-verca - and warns if that's
+        // not the case.
+        final Boolean wouldBeExcluded = PrefilteringBackgroundObserver.wouldBeExcludedByPrefiltering(
+                info, backgroundObserver);
+        if (previousRoot != null && !noopChange) {
             try {
                 long start = PERF_LOGGER.start();
                 FilterProvider provider = filterProvider.get();
+                boolean onEventInvoked = false;
                 // FIXME don't rely on toString for session id
                 if (provider.includeCommit(contentSession.toString(), info)) {
                     EventFilter filter = provider.getFilter(previousRoot, root);
@@ -349,6 +412,7 @@
                     if (hasEvents && runningMonitor.enterIf(running)) {
                         try {
                             CountingIterator countingEvents = new CountingIterator(events);
+                            onEventInvoked = true;
                             eventListener.onEvent(countingEvents);
                             countingEvents.updateCounters(eventCount, eventDuration);
                         } finally {
@@ -356,6 +420,14 @@
                         }
                     }
                 }
+                if (wouldBeExcluded != null) {
+                    if (wouldBeExcluded && onEventInvoked) {
+                        LOG.warn("contentChanged: delivering event which would have been filtered, this="+this+", listener="+eventListener);
+                    } else if (!wouldBeExcluded && !onEventInvoked) {
+                        LOG.info("contentChanged: no event to deliver but we didn't filter this one, this="+this+", listener="+eventListener);
+                    }
+                }
+                if (onEventInvoked)
                 PERF_LOGGER.end(start, 100,
                         "Generated events (before: {}, after: {})",
                         previousRoot, root);
