Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java	(revision 1763448)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java	(working copy)
@@ -30,16 +30,18 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import com.google.common.base.Predicate;
 import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Predicate;
+
 /**
  * An observer that uses a change queue and a background thread to forward
  * content changes to another observer. The mechanism is designed so that
@@ -56,9 +58,18 @@
 public class BackgroundObserver implements Observer, Closeable {
 
     /**
+     * OAK-4916: commitInfo object representing a 'no op ie filtered change'. 
+     * Used to indicate Observers that a particular contentChanged call 
+     * should be ignored (but nevertheless the call is made, in particular
+     * to allow Observers to take note of the new root)
+     */
+    public static final CommitInfo NOOP_CHANGE =
+            new CommitInfo(CommitInfo.OAK_UNKNOWN, CommitInfo.OAK_UNKNOWN);
+
+    /**
      * Signal for the background thread to stop processing changes.
      */
-    private static final ContentChange STOP = new ContentChange(null, null);
+    private static final ContentChange STOP = new ContentChange(null, null, null);
 
     /**
      * The receiving observer being notified off the background thread.
@@ -92,10 +103,12 @@
             Boolean.parseBoolean(System.getProperty("oak.observation.alwaysCollapseExternal", "false"));
 
     private static class ContentChange {
+        private final NodeState noopPreviousRoot;
         private final NodeState root;
         private final CommitInfo info;
         private final long created = System.currentTimeMillis();
-        ContentChange(NodeState root, CommitInfo info) {
+        ContentChange(NodeState noopPreviousRoot, NodeState root, CommitInfo info) {
+            this.noopPreviousRoot = noopPreviousRoot;
             this.root = root;
             this.info = info;
         }
@@ -107,6 +120,12 @@
      */
     private ContentChange last;
 
+    private NodeState noopPreviousRoot;
+    
+    private NodeState previousRoot;
+
+    private boolean previousWasExcluded;
+
     /**
      * Flag to indicate that some content changes were dropped because
      * the queue was full.
@@ -129,6 +148,11 @@
                 try {
                     ContentChange change = queue.poll();
                     if (change != null && change != STOP) {
+                        if (change.noopPreviousRoot != null) {
+                            // a ContentChange that carries a noopPreviousRoot
+                            // indicates a NOOP change
+                            observer.contentChanged(change.noopPreviousRoot, NOOP_CHANGE);
+                        }
                         observer.contentChanged(change.root, change.info);
                         removed(queue.size(), change.created);
                         currentTask.onComplete(completionHandler);
@@ -272,7 +296,7 @@
         checkState(!stopped);
         checkNotNull(root);
 
-        if (alwaysCollapseExternalEvents && info == null && last != null && last.info == null) {
+        if (!previousWasExcluded && alwaysCollapseExternalEvents && info == null && last != null && last.info == null) {
             // This is an external change. If the previous change was
             // also external, we can drop it from the queue (since external
             // changes in any case can cover multiple commits) to help
@@ -281,36 +305,100 @@
             full = false;
         }
 
-        ContentChange change;
+        // excluded: allows to exclude an element - only supported when
+        // queue is not full (as queue full means 'compaction' and that
+        // doesn't work with some elements excluded and some not, potentially).
+        final boolean excluded = !full && isExcluded(previousRoot, root, info);
+        final ContentChange change;
+        final NodeState noopPreviousRoot;
+        if (previousWasExcluded) {
+            // skippedPreviousRoot != null indicates a 'NOOP_CHANGE'
+            noopPreviousRoot = this.noopPreviousRoot;
+        } else {
+            // skippedPreviousRoot == null indicates a normal ContentChange
+            noopPreviousRoot = null;
+        }
         if (full) {
             // If the queue is full, some commits have already been skipped
             // so we need to drop the possible local commit information as
             // only external changes can be merged together to larger chunks.
-            change = new ContentChange(root, null);
+            change = new ContentChange(noopPreviousRoot, root, null);
         } else {
-            change = new ContentChange(root, info);
+            change = new ContentChange(noopPreviousRoot, root, info);
         }
 
-        // Try to add this change to the queue without blocking, and
-        // mark the queue as full if there wasn't enough space
-        full = !queue.offer(change);
+        if (!excluded) {
+            // Try to add this change to the queue without blocking, and
+            // mark the queue as full if there wasn't enough space
+            full = !queue.offer(change);
+    
+            if (!full) {
+                // Keep track of the last change added, so we can do the
+                // compacting of external changes shown above.
+                last = change;
 
-        if (!full) {
-            // Keep track of the last change added, so we can do the
-            // compacting of external changes shown above.
-            last = change;
+                // we passed the previousRoot flag via noopPreviousRoot
+                // in ContentChange to the queue, so we can now reset
+                // the flag here
+                this.previousWasExcluded = false;
+                this.noopPreviousRoot = null;
+            } else {
+                // if the queue is full now, then we must not update
+                // the previousRoot and previousWasExcluded, as they
+                // must be handled by being added as noopPreviousRoot
+                // in ContentChange to the queue - which they currently
+                // can't - so leave them as is
+            }
+
+            // Set the completion handler on the currently running task. Multiple calls
+            // to onComplete are not a problem here since we always pass the same value.
+            // Thus there is no question as to which of the handlers will effectively run.
+            currentTask.onComplete(completionHandler);
+            added(queue.size());
+        } else {
+            this.previousWasExcluded = true;
+            this.noopPreviousRoot = root;
         }
+        this.previousRoot = root;
+    }
 
-        // Set the completion handler on the currently running task. Multiple calls
-        // to onComplete are not a problem here since we always pass the same value.
-        // Thus there is no question as to which of the handlers will effectively run.
-        currentTask.onComplete(completionHandler);
-        added(queue.size());
+    /**
+     * OAK-4916: Hook for subclasses : by overwriting and implementing isExcluded 
+     * subclasses can choose to filter out a particular commit - ie they will not 
+     * receive a contentChanged later on in the registered Observer.contentChanged.
+     * More precisely: excluded commits (one or many consecutive ones) will be replaced
+     * with a single 'fast-forward == noop' call to contentChanged with the 
+     * CommitInfo set as NOOP_CHANGED. That is: if a downstream Observer overwrites
+     * this isExcluded method it must be capable of handling NOOP_CHANGED CommitInfos
+     * and treat them such that the change is ignored but the pointer (previousRoot)
+     * is nevertheless forwarded. 
+     */
+    protected boolean isExcluded(NodeState before, NodeState after, CommitInfo info) {
+        return false;
     }
-
+    
     //------------------------------------------------------------< internal >---
 
     private static Logger getLogger(@Nonnull Observer observer) {
         return LoggerFactory.getLogger(checkNotNull(observer).getClass());
     }
+    
+    /** FOR TESTING ONLY 
+     * @throws InterruptedException **/
+    protected boolean waitUntilStopped(int timeout, TimeUnit unit) throws InterruptedException {
+    	long done = System.currentTimeMillis() + unit.toMillis(timeout);
+    	synchronized(this) {
+    	    queue.add(STOP);
+            currentTask.onComplete(completionHandler);
+    	}
+    	while(done > System.currentTimeMillis()) {
+    		synchronized(this) {
+    			if (queue.size() == 0) {
+    				return true;
+    			}
+    			wait(1);
+    		}
+    	}
+    	return false;
+    }
 }
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java	(revision 1763448)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java	(working copy)
@@ -20,30 +20,42 @@
 package org.apache.jackrabbit.oak.spi.commit;
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.io.Closeable;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import com.google.common.collect.Lists;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.After;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
+import junit.framework.AssertionFailedError;
+
 public class BackgroundObserverTest {
     private static final CommitInfo COMMIT_INFO = new CommitInfo("no-session", null);
     public static final int CHANGE_COUNT = 1024;
 
     private final List<Runnable> assertions = Lists.newArrayList();
     private CountDownLatch doneCounter;
+    private final List<Closeable> closeables = Lists.newArrayList();
 
     /**
      * Assert that each observer of many running concurrently sees the same
@@ -125,5 +137,261 @@
             }
         }, executor, queueLength);
     }
+    
+    class ExcludingBackgroundObserver extends BackgroundObserver {
 
+        private boolean excludeNext = false;
+
+        public ExcludingBackgroundObserver(Observer observer, Executor executor, int queueSize) {
+            super(observer, executor, queueSize);
+        }
+
+        void excludeNext(boolean excludeNext) {
+            this.excludeNext = excludeNext;
+        }
+
+        @Override
+        protected boolean isExcluded(NodeState before, NodeState after, CommitInfo info) {
+            boolean isExcluded = excludeNext;
+            excludeNext = false;
+            return isExcluded;
+        }
+
+    }
+
+    class Pair {
+        private final NodeState before;
+        private final NodeState after;
+
+        Pair(NodeState before, NodeState after) {
+            this.before = before;
+            this.after = after;
+        }
+    }
+
+    class Recorder implements Observer {
+
+        List<Pair> includedChanges = new LinkedList<Pair>();
+        private boolean pause;
+        private boolean pausing;
+        private NodeState lastRoot;
+
+        @Override
+        public void contentChanged(NodeState root, CommitInfo info) {
+            if (info != BackgroundObserver.NOOP_CHANGE) {
+                includedChanges.add(new Pair(lastRoot, root));
+            }
+            lastRoot = root;
+            synchronized (this) {
+                try {
+                    while (pause) {
+                        pausing = true;
+                        this.notifyAll();
+                        try {
+                            this.wait();
+                        } catch (InterruptedException e) {
+                            // should not happen
+                        }
+                    }
+                } finally {
+                    pausing = false;
+                    this.notifyAll();
+                }
+            }
+        }
+
+        public synchronized void pause() {
+            this.pause = true;
+        }
+
+        public synchronized void unpause() {
+            this.pause = false;
+            this.notifyAll();
+        }
+
+        public boolean waitForPausing(int timeout, TimeUnit unit) throws InterruptedException {
+            final long done = System.currentTimeMillis() + unit.toMillis(timeout);
+            synchronized (this) {
+                while (!pausing && done > System.currentTimeMillis()) {
+                    this.wait();
+                }
+                return pausing;
+            }
+        }
+
+    }
+
+    class NodeStateGenerator {
+        Random r = new Random(1232131); // seed: repeatable tests
+        NodeBuilder builder = EMPTY_NODE.builder();
+
+        NodeState next() {
+            builder.setProperty("p", r.nextInt());
+            NodeState result = builder.getNodeState();
+            builder = result.builder();
+            return result;
+        }
+    }
+
+    private void assertMatches(String msg, List<Pair> expected, List<Pair> actual) {
+        assertEquals("size mismatch. msg=" + msg, expected.size(), actual.size());
+        for (int i = 0; i < expected.size(); i++) {
+            assertSame("mismatch of before at pos=" + i + ", msg=" + msg, expected.get(i).before, actual.get(i).before);
+            assertSame("mismatch of after at pos=" + i + ", msg=" + msg, expected.get(i).after, actual.get(i).after);
+        }
+    }
+
+    @After
+    public void shutDown() throws Exception {
+        for (Closeable closeable : closeables) {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                throw new AssertionFailedError(e.getMessage());
+            }
+        }
+    }
+
+    @Test
+    public void testExcludedAllCommits() throws Exception {
+        Recorder recorder = new Recorder();
+        ExecutorService executor = newSingleThreadExecutor();
+        ExcludingBackgroundObserver ebo = new ExcludingBackgroundObserver(recorder, executor, 5);
+        closeables.add(ebo);
+        List<Pair> expected = new LinkedList<Pair>();
+        NodeStateGenerator generator = new NodeStateGenerator();
+        NodeState first = generator.next();
+        expected.add(new Pair(null, first));
+        ebo.contentChanged(first, CommitInfo.EMPTY);
+        for (int i = 0; i < 100000; i++) {
+            ebo.excludeNext(true);
+            ebo.contentChanged(generator.next(), CommitInfo.EMPTY);
+        }
+        assertTrue("testExcludedAllCommits", ebo.waitUntilStopped(5, TimeUnit.SECONDS));
+        assertMatches("testExcludedAllCommits", expected, recorder.includedChanges);
+    }
+
+    @Test
+    public void testNoExcludedCommits() throws Exception {
+        Recorder recorder = new Recorder();
+        ExecutorService executor = newSingleThreadExecutor();
+        ExcludingBackgroundObserver ebo = new ExcludingBackgroundObserver(recorder, executor, 10002);
+        closeables.add(ebo);
+        List<Pair> expected = new LinkedList<Pair>();
+        NodeStateGenerator generator = new NodeStateGenerator();
+        NodeState first = generator.next();
+        expected.add(new Pair(null, first));
+        ebo.contentChanged(first, CommitInfo.EMPTY);
+        NodeState previous = first;
+        for (int i = 0; i < 10000; i++) {
+            ebo.excludeNext(false);
+            NodeState next = generator.next();
+            expected.add(new Pair(previous, next));
+            previous = next;
+            ebo.contentChanged(next, CommitInfo.EMPTY);
+        }
+        assertTrue("testNoExcludedCommits", ebo.waitUntilStopped(5, TimeUnit.SECONDS));
+        assertMatches("testNoExcludedCommits", expected, recorder.includedChanges);
+    }
+
+    @Test
+    public void testExcludeCommitsWithFullQueue() throws Exception {
+        Recorder recorder = new Recorder();
+        ExecutorService executor = newSingleThreadExecutor();
+        ExcludingBackgroundObserver ebo = new ExcludingBackgroundObserver(recorder, executor, 2);
+        closeables.add(ebo);
+        List<Pair> expected = new LinkedList<Pair>();
+        NodeStateGenerator generator = new NodeStateGenerator();
+        recorder.pause();
+
+        // the first one will directly go to the recorder
+        NodeState first = generator.next();
+        expected.add(new Pair(null, first));
+        ebo.contentChanged(first, CommitInfo.EMPTY);
+
+        assertTrue("observer did not get called (yet?)", recorder.waitForPausing(5, TimeUnit.SECONDS));
+
+        // this one will the queued as #1
+        NodeState second = generator.next();
+        expected.add(new Pair(first, second));
+        ebo.contentChanged(second, CommitInfo.EMPTY);
+
+        // this one will the queued as #2
+        NodeState third = generator.next();
+        expected.add(new Pair(second, third));
+        ebo.contentChanged(third, CommitInfo.EMPTY);
+
+        // this one will cause the queue to 'overflow' (full==true)
+        NodeState forth = generator.next();
+        // not adding to expected, as this one ends up in the overflow element
+        ebo.contentChanged(forth, CommitInfo.EMPTY);
+
+        NodeState next;
+        // exclude when queue is full
+        ebo.excludeNext(true);
+        next = generator.next();
+        // if excluded==true and full, hence not adding to expected
+        ebo.contentChanged(next, CommitInfo.EMPTY);
+        // include after an exclude when queue was full
+        // => this is not supported. when the queue
+        ebo.excludeNext(false);
+        next = generator.next();
+        // excluded==false BUT queue full, hence not adding to expected
+        ebo.contentChanged(next, CommitInfo.EMPTY);
+        // let recorder continue
+        recorder.unpause();
+
+        Thread.sleep(1000); // wait for 1 element to be dequeued at least
+        // exclude when queue is no longer full
+        ebo.excludeNext(true);
+        next = generator.next();
+        // this is the interesting one of this test-method: it is marked for
+        // exclusion, however the queue was full with the last entry,
+        // so it still gets delivered, hence:
+        expected.add(new Pair(third /*
+                                     * third since this is a compacted change,
+                                     * not a filtered one
+                                     */, next));
+
+        ebo.contentChanged(next, CommitInfo.EMPTY);
+        assertTrue("testExcludeCommitsWithFullQueue", ebo.waitUntilStopped(5, TimeUnit.SECONDS));
+        assertMatches("testExcludeCommitsWithFullQueue", expected, recorder.includedChanges);
+    }
+
+    @Test
+    public void testExcludSomeCommits() throws Exception {
+        ExecutorService executor = newSingleThreadExecutor();
+        for (int i = 0; i < 100; i++) {
+            doTestExcludeSomeCommits(i, executor);
+        }
+        for (int i = 100; i < 10000; i += 50) {
+            doTestExcludeSomeCommits(i, executor);
+        }
+    }
+
+    private void doTestExcludeSomeCommits(int cnt, Executor executor) throws Exception {
+        Recorder recorder = new Recorder();
+        ExcludingBackgroundObserver ebo = new ExcludingBackgroundObserver(recorder, executor, cnt + 2);
+        closeables.add(ebo);
+        List<Pair> expected = new LinkedList<Pair>();
+        Random r = new Random(2343242); // seed: repeatable tests
+        NodeStateGenerator generator = new NodeStateGenerator();
+        NodeState first = generator.next();
+        expected.add(new Pair(null, first));
+        ebo.contentChanged(first, CommitInfo.EMPTY);
+        NodeState previous = first;
+        for (int i = 0; i < cnt; i++) {
+            boolean excludeNext = r.nextInt(100) < 90;
+            ebo.excludeNext(excludeNext);
+            NodeState next = generator.next();
+            if (!excludeNext) {
+                expected.add(new Pair(previous, next));
+            }
+            previous = next;
+            ebo.contentChanged(next, CommitInfo.EMPTY);
+        }
+        assertTrue("cnt=" + cnt, ebo.waitUntilStopped(5, TimeUnit.SECONDS));
+        assertMatches("cnt=" + cnt, expected, recorder.includedChanges);
+    }
+
 }
