Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java (nonexistent) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java (working copy) @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.observation.filter; + +import org.apache.jackrabbit.oak.plugins.observation.ChangeSet; + +/** + * A ChangeSetFilter is capable of inspecting a ChangeSet + * and deciding if the corresponding consumer + * (eg EventListener) is possibly interested in it + * or definitely not. + *

+ * Falsely deciding to include is fine, falsely + * deciding to exclude is not. + */ +public interface ChangeSetFilter { + + /** + * Decides if the commit belonging to the provided + * ChangeSet is potentially relevant to the listener + * or if it can definitely be excluded. + */ + boolean excludes(ChangeSet changeSet); + +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java (nonexistent) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java (working copy) @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.observation.filter; + +import static java.util.Collections.disjoint; +import static org.apache.jackrabbit.oak.commons.PathUtils.concat; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.regex.Pattern; + +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.oak.plugins.observation.ChangeSet; + +public class ChangeSetFilterImpl implements ChangeSetFilter { + + private final Set rootIncludePaths; + private final Set includePathPatterns; + private final Set excludePathPatterns; + private final Set parentNodeNames; + private final Set parentNodeTypes; + private final Set propertyNames; + + public ChangeSetFilterImpl(@Nonnull Set includedParentPaths, boolean isDeep, Set excludedParentPaths, + Set parentNodeNames, Set parentNodeTypes, Set propertyNames) { + this.rootIncludePaths = new HashSet(); + this.includePathPatterns = new HashSet(); + for (String aRawIncludePath : includedParentPaths) { + final String aGlobbingIncludePath; + if (aRawIncludePath.contains("*")) { + // then isDeep is not applicable, it is already a glob path + aGlobbingIncludePath = aRawIncludePath; + } else { + aGlobbingIncludePath = !isDeep ? aRawIncludePath : concat(aRawIncludePath, "**"); + } + this.rootIncludePaths.add(aRawIncludePath); + this.includePathPatterns.add(asPattern(aGlobbingIncludePath)); + } + this.excludePathPatterns = new HashSet(); + for (String aRawExcludePath : excludedParentPaths) { + this.excludePathPatterns.add(asPattern(concat(aRawExcludePath, "**"))); + } + this.propertyNames = propertyNames == null ? null : new HashSet(propertyNames); + this.parentNodeTypes = parentNodeTypes == null ? null : new HashSet(parentNodeTypes); + this.parentNodeNames = parentNodeNames == null ? null : new HashSet(parentNodeNames); + } + + private Pattern asPattern(String patternWithGlobs) { + return Pattern.compile(GlobbingPathHelper.globPathAsRegex(patternWithGlobs)); + } + + @Override + public boolean excludes(ChangeSet changeSet) { + final Set parentPaths = new HashSet(changeSet.getParentPaths()); + + // first go through excludes to remove those that are explicitly + // excluded + if (this.excludePathPatterns.size() != 0) { + final Iterator it = parentPaths.iterator(); + while (it.hasNext()) { + final String aParentPath = it.next(); + if (patternsMatch(this.excludePathPatterns, aParentPath)) { + // if an exclude pattern matches, remove the parentPath + it.remove(); + } + } + } + // note that cut-off paths are not applied with excludes, + // eg if excludePaths contains /var/foo/bar and path contains /var/foo + // with a maxPathLevel of 2, that might very well mean that + // the actual path would have been /var/foo/bar, but we don't know. + // so we cannot exclude it here and thus have a potential false negative + // (ie we didn't exclude it in the prefilter) + + // now remainingPaths contains what is not excluded, + // then check if it is included + boolean included = false; + for (String aPath : parentPaths) { + // direct set contains is fastest, lets try that first + if (this.rootIncludePaths.contains(aPath)) { + included = true; + break; + } + if (patternsMatch(this.includePathPatterns, aPath)) { + included = true; + break; + } + } + + if (!included) { + // well then we can definitely say that this commit is excluded + return true; + } + + if (this.propertyNames != null && this.propertyNames.size() != 0) { + if (disjoint(changeSet.getPropertyNames(), this.propertyNames)) { + return true; + } + // otherwise we have found a match, but one of the + // nodeType/nodeNames + // could still filter out, so we have to continue... + } + + if (this.parentNodeTypes != null && this.parentNodeTypes.size() != 0) { + if (disjoint(changeSet.getParentNodeTypes(), this.parentNodeTypes)) { + return true; + } + // otherwise, again, continue + } + + if (this.parentNodeNames != null && this.parentNodeNames.size() != 0) { + if (disjoint(changeSet.getParentNodeNames(), this.parentNodeNames)) { + return true; + } + } + + // at this stage we haven't found any exclude, so we're likely including + return false; + } + + private static boolean patternsMatch(Set pathPatterns, String path) { + if (path == null) { + return false; + } + for (Pattern pathPattern : pathPatterns) { + if (pathPattern.matcher(path).matches()) { + return true; + } + } + return false; + } +} Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java (revision 1768549) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java (working copy) @@ -42,6 +42,7 @@ import com.google.common.collect.Iterables; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.plugins.nodetype.TypePredicate; +import org.apache.jackrabbit.oak.plugins.observation.ChangeSet; import org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector; import org.apache.jackrabbit.oak.plugins.tree.RootFactory; import org.apache.jackrabbit.oak.spi.commit.CommitInfo; @@ -59,6 +60,13 @@ private boolean includeClusterLocal = true; private final List subTrees = newArrayList(); private Condition condition = includeAll(); + private ChangeSetFilter changeSetFilter = new ChangeSetFilter() { + + @Override + public boolean excludes(ChangeSet changeSet) { + return false; + } + }; private EventAggregator aggregator; @@ -66,6 +74,12 @@ @Nonnull EventFilter createFilter(@Nonnull NodeState before, @Nonnull NodeState after); } + + @Nonnull + public FilterBuilder setChangeSetFilter(@Nonnull ChangeSetFilter changeSetFilter) { + this.changeSetFilter = changeSetFilter; + return this; + } /** * Adds a path to the set of paths whose subtrees include all events of @@ -380,6 +394,7 @@ final EventAggregator aggregator = FilterBuilder.this.aggregator; final Iterable subTrees = FilterBuilder.this.getSubTrees(); final Condition condition = FilterBuilder.this.condition; + final ChangeSetFilter changeSetFilter = FilterBuilder.this.changeSetFilter; @Override public boolean includeCommit(@Nonnull String sessionId, @CheckForNull CommitInfo info) { @@ -417,6 +432,11 @@ public EventAggregator getEventAggregator() { return aggregator; } + + @Override + public boolean excludes(ChangeSet changeSet) { + return changeSetFilter.excludes(changeSet); + } }; } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java (revision 1768549) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java (working copy) @@ -28,8 +28,11 @@ /** * Instance of this class provide a {@link EventFilter} for observation * events and a filter for commits. + *

+ * In order to support OAK-4908 a FilterProvider + * extends ChangeSetFilter */ -public interface FilterProvider { +public interface FilterProvider extends ChangeSetFilter { /** * Filter whole commits. Only commits for which this method returns Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (revision 1768549) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (working copy) @@ -30,6 +30,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -313,4 +314,28 @@ private static Logger getLogger(@Nonnull Observer observer) { return LoggerFactory.getLogger(checkNotNull(observer).getClass()); } + + + /** FOR TESTING ONLY + * @throws InterruptedException **/ + boolean waitUntilStopped(int timeout, TimeUnit unit) throws InterruptedException { + long done = System.currentTimeMillis() + unit.toMillis(timeout); + boolean added; + synchronized(this) { + added = queue.offer(STOP); + currentTask.onComplete(completionHandler); + } + while(done > System.currentTimeMillis()) { + synchronized(this) { + if (!added) { + added = queue.offer(STOP); + } + if (queue.size() == 0 || (queue.size() == 1 && queue.peek() == STOP)) { + return true; + } + wait(1); + } + } + return false; + } } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java (nonexistent) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java (working copy) @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.observation.filter; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.apache.jackrabbit.oak.plugins.observation.ChangeSet; +import org.apache.jackrabbit.oak.plugins.observation.ChangeSetBuilder; +import org.junit.Test; + +public class ChangeSetFilterImplTest { + + /** shortcut for creating a set of strings */ + private Set s(String... entries) { + return new HashSet(Arrays.asList(entries)); + } + + private ChangeSet newChangeSet(int maxPathDepth, Set parentPaths, + Set parentNodeNames, + Set parentNodeTypes, + Set propertyNames) { + ChangeSetBuilder changeSetBuilder = new ChangeSetBuilder(Integer.MAX_VALUE, maxPathDepth); + changeSetBuilder.getParentPaths().addAll(parentPaths); + changeSetBuilder.getParentNodeNames().addAll(parentNodeNames); + changeSetBuilder.getParentNodeTypes().addAll(parentNodeTypes); + changeSetBuilder.getPropertyNames().addAll(propertyNames); + return changeSetBuilder.build(); + } + + @Test + public void testIsDeepFalse() throws Exception { + ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), false, s("/excluded"), s(), s(), s()); + + assertTrue(prefilter.excludes(newChangeSet(5, s("/child1", "/child2"), s("child1", "child2"), s(), s()))); + assertFalse(prefilter.excludes(newChangeSet(5, s("/", "/child2"), s("child2"), s(), s()))); + } + + @Test + public void testParentPathsIncludeExclude() throws Exception { + ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), true, s("/excluded"), s(), s(), s()); + assertFalse(prefilter.excludes(newChangeSet(5, s("/a", "/b"), s("a", "b"), s(), s()))); + assertTrue(prefilter.excludes(newChangeSet(5, s("/excluded/foo", "/excluded/bar"), s("foo", "bar"), s(), s()))); + + prefilter = new ChangeSetFilterImpl(s("/included"), true, s("/excluded"), s(), s(), s()); + assertTrue(prefilter.excludes(newChangeSet(5, s("/a", "/b"), s(), s(), s()))); + assertFalse(prefilter.excludes(newChangeSet(5, s("/included/a", "/included/b"), s(), s(), s()))); + assertTrue(prefilter.excludes(newChangeSet(5, s("/excluded/foo", "/excluded/bar"), s(), s(), s()))); + + prefilter = new ChangeSetFilterImpl(s("/foo/**/included/**"), true /*ignored for globs */, s("/excluded"), s(), s(), s()); + assertTrue(prefilter.excludes(newChangeSet(5, s("/a", "/b"), s(), s(), s()))); + assertTrue(prefilter.excludes(newChangeSet(5, s("/included/a", "/included/b"), s(), s(), s()))); + assertFalse(prefilter.excludes(newChangeSet(5, s("/foo/included/a"), s(), s(), s()))); + assertTrue(prefilter.excludes(newChangeSet(5, s("/included/b"), s(), s(), s()))); + assertFalse(prefilter.excludes(newChangeSet(5, s("/foo/bar/included/a", "/included/b"), s(), s(), s()))); + assertTrue(prefilter.excludes(newChangeSet(5, s("/excluded/foo", "/excluded/bar"), s(), s(), s()))); + + prefilter = new ChangeSetFilterImpl(s("/main/**/included"), true, s("/main/excluded"), s(), s(), s()); + assertTrue(prefilter.excludes(newChangeSet(5, s("/main", "/main/foo"), s(), s(), s()))); + assertFalse(prefilter.excludes(newChangeSet(5, s("/main/included", "/main/excluded"), s(), s(), s()))); + assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded/included", "/main/excluded"), s(), s(), s()))); + + prefilter = new ChangeSetFilterImpl(s("/main/included/**"), true, s("/main/excluded"), s(), s(), s()); + assertTrue(prefilter.excludes(newChangeSet(5, s("/main", "/main/foo"), s(), s(), s()))); + assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded"), s(), s(), s()))); + assertFalse(prefilter.excludes(newChangeSet(5, s("/main/included"), s(), s(), s()))); + assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded/included", "/main/excluded"), s(), s(), s()))); + + prefilter = new ChangeSetFilterImpl(s("/main/inc-*/**"), true, s("/main/excluded"), s(), s(), s()); + assertTrue(prefilter.excludes(newChangeSet(5, s("/main", "/main/foo"), s(), s(), s()))); + assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded"), s(), s(), s()))); + assertFalse(prefilter.excludes(newChangeSet(5, s("/main/inc-luded"), s(), s(), s()))); + assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded/included", "/main/excluded"), s(), s(), s()))); + } + + @Test + public void testParentNodeNames() throws Exception { + ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), true, s(), s("foo", "bar"), s(), s()); + assertFalse(prefilter.excludes(newChangeSet(5, s("/a/foo", "/b"), s("foo", "b"), s(), s()))); + assertTrue(prefilter.excludes(newChangeSet(5, s("/a/zoo", "/b"), s("zoo", "b"), s(), s()))); + assertFalse(prefilter.excludes(newChangeSet(5, s("/a/zoo", "/bar"), s("zoo", "bar"), s(), s()))); + } + + @Test + public void testParentNodeTypes() throws Exception { + ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), true, s(), s(), s("nt:folder"), s()); + assertTrue(prefilter.excludes(newChangeSet(5, s("/a"), s("a"), s("nt:unstructured"), s()))); + assertFalse(prefilter.excludes(newChangeSet(5, s("/a"), s("a"), s("nt:folder"), s()))); + } + + @Test + public void testPropertyNames() throws Exception { + ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), true, s(), s(), s(), s("jcr:data")); + assertTrue(prefilter.excludes(newChangeSet(5, s("/a"), s("a"), s(), s("myProperty")))); + assertFalse(prefilter.excludes(newChangeSet(5, s("/a"), s("a"), s(), s("jcr:data")))); + } + +} Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java (revision 1768549) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java (working copy) @@ -20,13 +20,19 @@ package org.apache.jackrabbit.oak.spi.commit; import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.Executors.newSingleThreadExecutor; import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import java.io.Closeable; import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -33,11 +39,19 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import com.google.common.collect.Lists; import org.apache.jackrabbit.oak.api.Type; +import org.apache.jackrabbit.oak.plugins.observation.Filter; +import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver; +import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.junit.After; import org.junit.Test; +import com.google.common.collect.Lists; + +import junit.framework.AssertionFailedError; + public class BackgroundObserverTest { private static final CommitInfo COMMIT_INFO = new CommitInfo("no-session", null); public static final int CHANGE_COUNT = 1024; @@ -44,10 +58,12 @@ private final List assertions = Lists.newArrayList(); private CountDownLatch doneCounter; + private final List closeables = Lists.newArrayList(); /** * Assert that each observer of many running concurrently sees the same - * linearly sequence of commits (i.e. sees the commits in the correct order). + * linearly sequence of commits (i.e. sees the commits in the correct + * order). */ @Test public void concurrentObservers() throws InterruptedException { @@ -99,7 +115,7 @@ return new BackgroundObserver(new Observer() { // Need synchronised list here to maintain correct memory barrier // when this is passed on to done(List) - final List assertions = Collections.synchronizedList(Lists.newArrayList()); + final List assertions = Collections.synchronizedList(Lists. newArrayList()); volatile NodeState previous; @Override @@ -125,5 +141,288 @@ } }, executor, queueLength); } + + class MyFilter implements Filter { + private boolean excludeNext; + + void excludeNext(boolean excludeNext) { + this.excludeNext = excludeNext; + } + + @Override + public boolean excludes(NodeState root, CommitInfo info) { + final boolean excludes = excludeNext; + excludeNext = false; + return excludes; + } + + } + + class Recorder implements FilteringAwareObserver { + + List includedChanges = new LinkedList(); + private boolean pause; + private boolean pausing; + + public Recorder() { + } + + @Override + public void contentChanged(NodeState before, NodeState after, CommitInfo info) { + includedChanges.add(new Pair(before, after)); + maybePause(); + } + + public void maybePause() { + synchronized (this) { + try { + while (pause) { + pausing = true; + this.notifyAll(); + try { + this.wait(); + } catch (InterruptedException e) { + // should not happen + } + } + } finally { + pausing = false; + this.notifyAll(); + } + } + } + + public synchronized void pause() { + this.pause = true; + } + + public synchronized void unpause() { + this.pause = false; + this.notifyAll(); + } + + public boolean waitForPausing(int timeout, TimeUnit unit) throws InterruptedException { + final long done = System.currentTimeMillis() + unit.toMillis(timeout); + synchronized (this) { + while (!pausing && done > System.currentTimeMillis()) { + this.wait(); + } + return pausing; + } + } + + public boolean waitForUnpausing(int timeout, TimeUnit unit) throws InterruptedException { + final long done = System.currentTimeMillis() + unit.toMillis(timeout); + synchronized (this) { + while (pausing && done > System.currentTimeMillis()) { + this.wait(); + } + return !pausing; + } + } + + } + + class Pair { + private final NodeState before; + private final NodeState after; + + Pair(NodeState before, NodeState after) { + this.before = before; + this.after = after; + } + + @Override + public String toString() { + return "Pair(before=" + before + ", after=" + after + ")"; + } + } + + class NodeStateGenerator { + Random r = new Random(1232131); // seed: repeatable tests + NodeBuilder builder = EMPTY_NODE.builder(); + + NodeState next() { + builder.setProperty("p", r.nextInt()); + NodeState result = builder.getNodeState(); + builder = result.builder(); + return result; + } + } + + private void assertMatches(String msg, List expected, List actual) { + assertEquals("size mismatch. msg=" + msg, expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i++) { + assertSame("mismatch of before at pos=" + i + ", msg=" + msg, expected.get(i).before, actual.get(i).before); + assertSame("mismatch of after at pos=" + i + ", msg=" + msg, expected.get(i).after, actual.get(i).after); + } + } + + @After + public void shutDown() throws Exception { + for (Closeable closeable : closeables) { + try { + closeable.close(); + } catch (Exception e) { + throw new AssertionFailedError(e.getMessage()); + } + } + } + + @Test + public void testExcludedAllCommits() throws Exception { + MyFilter filter = new MyFilter(); + Recorder recorder = new Recorder(); + ExecutorService executor = newSingleThreadExecutor(); + FilteringObserver fo = new FilteringObserver(executor, 5, filter, recorder); + closeables.add(fo); + List expected = new LinkedList(); + NodeStateGenerator generator = new NodeStateGenerator(); + NodeState first = generator.next(); + expected.add(new Pair(null, first)); + fo.contentChanged(first, CommitInfo.EMPTY); + for (int i = 0; i < 100000; i++) { + filter.excludeNext(true); + fo.contentChanged(generator.next(), CommitInfo.EMPTY); + } + assertTrue("testExcludedAllCommits", fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS)); + assertMatches("testExcludedAllCommits", expected, recorder.includedChanges); + } + + @Test + public void testNoExcludedCommits() throws Exception { + MyFilter filter = new MyFilter(); + Recorder recorder = new Recorder(); + ExecutorService executor = newSingleThreadExecutor(); + FilteringObserver fo = new FilteringObserver(executor, 10002, filter, recorder); + closeables.add(fo); + List expected = new LinkedList(); + NodeStateGenerator generator = new NodeStateGenerator(); + NodeState first = generator.next(); + expected.add(new Pair(null, first)); + fo.contentChanged(first, CommitInfo.EMPTY); + NodeState previous = first; + for (int i = 0; i < 10000; i++) { + filter.excludeNext(false); + NodeState next = generator.next(); + expected.add(new Pair(previous, next)); + previous = next; + fo.contentChanged(next, CommitInfo.EMPTY); + } + assertTrue("testNoExcludedCommits", fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS)); + assertMatches("testNoExcludedCommits", expected, recorder.includedChanges); + } + + @Test + public void testExcludeCommitsWithFullQueue() throws Exception { + MyFilter filter = new MyFilter(); + Recorder recorder = new Recorder(); + ExecutorService executor = newSingleThreadExecutor(); + FilteringObserver fo = new FilteringObserver(executor, 2, filter, recorder); + closeables.add(fo); + List expected = new LinkedList(); + NodeStateGenerator generator = new NodeStateGenerator(); + recorder.pause(); + + // the first one will directly go to the recorder + NodeState firstIncluded = generator.next(); + expected.add(new Pair(null, firstIncluded)); + fo.contentChanged(firstIncluded, CommitInfo.EMPTY); + + assertTrue("observer did not get called (yet?)", recorder.waitForPausing(5, TimeUnit.SECONDS)); + + // this one will be queued as #1 + NodeState secondIncluded = generator.next(); + expected.add(new Pair(firstIncluded, secondIncluded)); + fo.contentChanged(secondIncluded, CommitInfo.EMPTY); + + // this one will be queued as #2 + NodeState thirdIncluded = generator.next(); + expected.add(new Pair(secondIncluded, thirdIncluded)); + fo.contentChanged(thirdIncluded, CommitInfo.EMPTY); + + // this one will cause the queue to 'overflow' (full==true) + NodeState forthQueueFull = generator.next(); + // not adding to expected, as this one ends up in the overflow element + fo.contentChanged(forthQueueFull, CommitInfo.EMPTY); + + NodeState next; + // exclude when queue is full + filter.excludeNext(true); + next = generator.next(); + // if excluded==true and full, hence not adding to expected + fo.contentChanged(next, CommitInfo.EMPTY); + // include after an exclude when queue was full + // => this is not supported. when the queue + filter.excludeNext(false); + next = generator.next(); + // excluded==false BUT queue full, hence not adding to expected + fo.contentChanged(next, CommitInfo.EMPTY); + // let recorder continue + recorder.unpause(); + + recorder.waitForUnpausing(5, TimeUnit.SECONDS); + Thread.sleep(1000); // wait for 1 element to be dequeued at least + // exclude when queue is no longer full + filter.excludeNext(true); + NodeState seventhAfterQueueFull = generator.next(); + // with the introduction of the FilteringAwareObserver this + // 'seventhAfterQueueFull' root will not be forwarded + // to the BackgroundObserver - thus entirely filtered + + fo.contentChanged(seventhAfterQueueFull, CommitInfo.EMPTY); + + // but with the introduction of FilteringAwareObserver the delivery + // only happens with non-filtered items, so adding yet another one now + filter.excludeNext(false); + NodeState last = generator.next(); + // while above the "seventhAfterQueueFull" DOES get filtered, the next contentChange + // triggers the release of the 'queue full overflow element' (with commitInfo==null) + // and that we must add as expected() + expected.add(new Pair(thirdIncluded, seventhAfterQueueFull)); // commitInfo == null + expected.add(new Pair(seventhAfterQueueFull, last)); + fo.contentChanged(last, CommitInfo.EMPTY); + + assertTrue("testExcludeCommitsWithFullQueue", fo.getBackgroundObserver().waitUntilStopped(10, TimeUnit.SECONDS)); + assertMatches("testExcludeCommitsWithFullQueue", expected, recorder.includedChanges); + } + + @Test + public void testExcludeSomeCommits() throws Exception { + ExecutorService executor = newSingleThreadExecutor(); + for (int i = 0; i < 100; i++) { + doTestExcludeSomeCommits(i, executor); + } + for (int i = 100; i < 10000; i += 50) { + doTestExcludeSomeCommits(i, executor); + } + } + + private void doTestExcludeSomeCommits(int cnt, Executor executor) throws Exception { + MyFilter filter = new MyFilter(); + Recorder recorder = new Recorder(); + FilteringObserver fo = new FilteringObserver(executor, cnt + 2, filter, recorder); + closeables.add(fo); + List expected = new LinkedList(); + Random r = new Random(2343242); // seed: repeatable tests + NodeStateGenerator generator = new NodeStateGenerator(); + NodeState first = generator.next(); + expected.add(new Pair(null, first)); + fo.contentChanged(first, CommitInfo.EMPTY); + NodeState previous = first; + for (int i = 0; i < cnt; i++) { + boolean excludeNext = r.nextInt(100) < 90; + filter.excludeNext(excludeNext); + NodeState next = generator.next(); + if (!excludeNext) { + expected.add(new Pair(previous, next)); + } + previous = next; + fo.contentChanged(next, CommitInfo.EMPTY); + } + assertTrue("cnt=" + cnt, fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS)); + assertMatches("cnt=" + cnt, expected, recorder.includedChanges); + } + } Index: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java (nonexistent) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java (working copy) @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.spi.commit; + +import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE; +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.jackrabbit.oak.plugins.observation.Filter; +import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver; +import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.junit.Test; + +public class PrefilteringBackgroundObserverTest { + + private final boolean EXCLUDED = true; + private final boolean INCLUDED = false; + + private List runnableQ; + private ExecutorService executor; + private CompositeObserver compositeObserver; + private List received; + private FilteringObserver filteringObserver; + private CommitInfo includingCommitInfo = new CommitInfo("includingSession", CommitInfo.OAK_UNKNOWN); + private CommitInfo excludingCommitInfo = new CommitInfo("excludingSession", CommitInfo.OAK_UNKNOWN); + private int resetCallCnt; + + public void init(int queueLength) throws Exception { + runnableQ = new LinkedList(); + executor = new EnqueuingExecutorService(runnableQ); + compositeObserver = new CompositeObserver(); + received = new LinkedList(); + filteringObserver = new FilteringObserver(executor, queueLength, new Filter() { + + @Override + public boolean excludes(NodeState root, CommitInfo info) { + if (info == includingCommitInfo) { + return false; + } else if (info == excludingCommitInfo) { + return true; + } else if (info == null) { + return false; + } + throw new IllegalStateException("only supporting include or exclude"); + } + }, new FilteringAwareObserver() { + + NodeState previous; + + @Override + public void contentChanged(NodeState before, NodeState after, CommitInfo info) { + received.add(new ContentChanged(after, info)); + if (previous !=null && previous != before) { + resetCallCnt++; + } + previous = after; + } + }); + compositeObserver.addObserver(filteringObserver); + } + + private final class EnqueuingExecutorService extends AbstractExecutorService { + private final List runnableQ; + + private EnqueuingExecutorService(List runnableQ) { + this.runnableQ = runnableQ; + } + + @Override + public void execute(Runnable command) { + runnableQ.add(command); + } + + @Override + public List shutdownNow() { + throw new IllegalStateException("nyi"); + } + + @Override + public void shutdown() { + throw new IllegalStateException("nyi"); + } + + @Override + public boolean isTerminated() { + throw new IllegalStateException("nyi"); + } + + @Override + public boolean isShutdown() { + throw new IllegalStateException("nyi"); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + throw new IllegalStateException("nyi"); + } + } + + class ContentChanged { + NodeState root; + CommitInfo info; + ContentChanged(NodeState root, CommitInfo info) { + this.root = root; + this.info = info; + } + } + + private static void executeRunnables(final List runnableQ, int num) { + for(int i=0; i(runnableQ)) { + runnable.run(); + } + } + } + + private static NodeState p(int k) { + return EMPTY_NODE.builder().setProperty("p", k).getNodeState(); + } + + @Test + public void testFlipping() throws Exception { + final int queueLength = 2000; + init(queueLength); + + // initialize observer with an initial contentChanged + // (see ChangeDispatcher#addObserver) + { + compositeObserver.contentChanged(p(-1), null); + } + // Part 1 : first run with filtersEvaluatedMapWithEmptyObservers - empty or null shouldn't matter, it's excluded in both cases + for (int k = 0; k < 1000; k++) { + CommitInfo info; + if (k%2==1) { + info = includingCommitInfo; + } else { + info = excludingCommitInfo; + } + final NodeState p = p(k); + compositeObserver.contentChanged(p, info); + if (k%10 == 0) { + executeRunnables(runnableQ, 10); + } + } + executeRunnables(runnableQ, 10); + + assertEquals(501, received.size()); + assertEquals(500, resetCallCnt); + + // Part 2 : run with filtersEvaluatedMapWithNullObservers - empty or null shouldn't matter, it's excluded in both cases + received.clear(); + resetCallCnt = 0; + for (int k = 0; k < 1000; k++) { + CommitInfo info; + if (k%2==1) { + info = includingCommitInfo; + } else { + info = excludingCommitInfo; + } + final NodeState p = p(k); + compositeObserver.contentChanged(p, info); + if (k%10 == 0) { + executeRunnables(runnableQ, 10); + } + } + executeRunnables(runnableQ, 10); + + assertEquals(500, received.size()); + assertEquals(500, resetCallCnt); + + // Part 3 : unlike the method name suggests, this variant tests with the filter disabled, so should receive all events normally + received.clear(); + resetCallCnt = 0; + for (int k = 0; k < 1000; k++) { + CommitInfo info; + if (k%2==1) { + info = includingCommitInfo; + } else { + info = includingCommitInfo; + } + final NodeState p = p(k); + compositeObserver.contentChanged(p, info); + if (k%10 == 0) { + executeRunnables(runnableQ, 10); + } + } + executeRunnables(runnableQ, 10); + + assertEquals(1000, received.size()); + assertEquals(0, resetCallCnt); + } + + @Test + public void testFlipping2() throws Exception { + doTestFullQueue(6, + new TestPattern(INCLUDED, 1, true, 1, 0), + new TestPattern(EXCLUDED, 5, true, 0, 0), + new TestPattern(INCLUDED, 2, true, 2, 1), + new TestPattern(EXCLUDED, 1, true, 0, 0), + new TestPattern(INCLUDED, 2, true, 2, 1)); + } + + @Test + public void testQueueNotFull() throws Exception { + doTestFullQueue(20, + // start: empty queue + new TestPattern(EXCLUDED, 1000, false, 0, 0), + // here: still empty, just the previousRoot is set to remember above NOOPs + new TestPattern(INCLUDED, 5, false, 0, 0), + // here: 5 changes are in the queue, the queue fits 20, way to go + new TestPattern(EXCLUDED, 500, false, 0, 0), + // still 5 in the queue + new TestPattern(INCLUDED, 5, false, 0, 0), + // now we added 2, queue still not full + new TestPattern(EXCLUDED, 0 /* only flush*/, true, 10, 2) + ); + } + + @Test + public void testIncludeOnQueueFull() throws Exception { + doTestFullQueue(7, + // start: empty queue + new TestPattern(EXCLUDED, 1000, false, 0, 0, 0, 0), + // here: still empty, just the previousRoot is set to remember above NOOPs + new TestPattern(INCLUDED, 5, false, 0, 0, 0, 6), + // here: 1 init and 5 changes are in the queue, the queue fits 7, so queue is almost full + new TestPattern(EXCLUDED, 500, false, 0, 0, 6, 6), + // still 6 in the queue, of 7 + new TestPattern(INCLUDED, 5, false, 0, 0, 6, 7), + // now we added 2 (one NOOP and one of those 5), so the queue got full (==7) + new TestPattern(EXCLUDED, 0 /* only flush*/, true, 5, 1, 7, 0) + ); + } + + @Test + public void testExcludeOnQueueFull2() throws Exception { + doTestFullQueue(1, + // start: empty queue + new TestPattern(INCLUDED, 10, false, 0, 0), + new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0), + new TestPattern(INCLUDED, 10, false, 0, 0), + new TestPattern(EXCLUDED, 10, false, 0, 0), + new TestPattern(INCLUDED, 10, false, 0, 0), + new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0), + new TestPattern(INCLUDED, 10, false, 0, 0), + new TestPattern(EXCLUDED, 10, false, 0, 0), + new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0), + new TestPattern(EXCLUDED, 10, false, 0, 0), + new TestPattern(INCLUDED, 10, false, 0, 0), + new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0)); + } + + @Test + public void testExcludeOnQueueFull1() throws Exception { + doTestFullQueue(4, + // start: empty queue + new TestPattern(EXCLUDED, 1, false, 0, 0, 0, 0), + // here: still empty, just the previousRoot is set to remember above NOOP + new TestPattern(INCLUDED, 3, false, 0, 0, 0, 4), + // here: 3 changes are in the queue, the queue fits 3, so it just got full now + new TestPattern(EXCLUDED, 1, false, 0, 0, 4, 4), + // still full but it's ignored, so doesn't have any queue length effect + new TestPattern(INCLUDED, 3, false, 0, 0, 4, 4), + // adding 3 will not work, it will result in an overflow entry + new TestPattern(EXCLUDED, 0 /* only flush*/, true, 3, 1, 4, 0), + new TestPattern(INCLUDED, 1, false, 0, 0, 0, 1), + new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0, 1, 0) + ); + } + + class TestPattern { + final boolean flush; + final boolean excluded; + final int numEvents; + final int expectedNumEvents; + final int expectedNumResetCalls; + private int expectedQueueSizeAtStart = -1; + private int expectedQueueSizeAtEnd = -1; + TestPattern(boolean excluded, int numEvents, boolean flush, int expectedNumEvents, int expectedNumResetCalls) { + this.flush = flush; + this.excluded = excluded; + this.numEvents = numEvents; + this.expectedNumEvents = expectedNumEvents; + this.expectedNumResetCalls = expectedNumResetCalls; + } + TestPattern(boolean excluded, int numEvents, boolean flush, int expectedNumEvents, int expectedNumResetCalls, int expectedQueueSizeAtStart, int expectedQueueSizeAtEnd) { + this(excluded, numEvents, flush, expectedNumEvents, expectedNumResetCalls); + this.expectedQueueSizeAtStart = expectedQueueSizeAtStart; + this.expectedQueueSizeAtEnd = expectedQueueSizeAtEnd; + } + + @Override + public String toString() { + return "excluded="+excluded+", numEvents="+numEvents+", flush="+flush+", expectedNumEvents="+expectedNumEvents+", expectedNumResetCalls="+expectedNumResetCalls; + } + } + + private void doTestFullQueue(int queueLength, TestPattern... testPatterns) throws Exception { + init(queueLength); + + // initialize observer with an initial contentChanged + // (see ChangeDispatcher#addObserver) + { + compositeObserver.contentChanged(p(-1), null); + } + // remove above first event right away + executeRunnables(runnableQ, 5); + received.clear(); + resetCallCnt = 0; + + int k = 0; + int loopCnt = 0; + for (TestPattern testPattern : testPatterns) { + k++; + if (testPattern.expectedQueueSizeAtStart >= 0) { + assertEquals("loopCnt="+loopCnt+", queue size mis-match at start", + testPattern.expectedQueueSizeAtStart, filteringObserver.getBackgroundObserver().getMBean().getQueueSize()); + } + for(int i=0; i using oak.observation.prefilteringTestMode = " + prefilteringTestModeBool); + } + } catch(RuntimeException e) { + LOG.warn(" could not parse oak.observation.prefilteringTestMode, using default (" + prefilteringTestModeBool + "): " + e, e); + } + PREFILTERING_TESTMODE = prefilteringTestModeBool; } private static final AtomicInteger COUNTER = new AtomicInteger(); @@ -145,8 +186,22 @@ */ private CompositeRegistration registration; - private volatile NodeState previousRoot; - + /** + * for statistics: tracks how many times prefiltering excluded a commit + */ + private int prefilterExcludeCount; + + /** + * for statistics: tracks how many times prefiltering included a commit + */ + private int prefilterIncludeCount; + + /** + * for statistics: tracks how many times prefiltering was ignored (not evaluated at all), + * either because it was disabled, queue too small, CommitInfo null or CommitContext null + */ + private int prefilterSkipCount; + public ChangeProcessor( ContentSession contentSession, NamePathMapper namePathMapper, @@ -180,6 +235,29 @@ return filterProvider.get(); } + @Nonnull + public ChangeProcessorMBean getMBean() { + return new ChangeProcessorMBean() { + + @Override + public int getPrefilterExcludeCount() { + return prefilterExcludeCount; + } + + @Override + public int getPrefilterIncludeCount() { + return prefilterIncludeCount; + } + + @Override + public int getPrefilterSkipCount() { + return prefilterSkipCount; + } + + }; + } + + /** * Start this change processor * @param whiteboard the whiteboard instance to used for scheduling individual @@ -190,16 +268,18 @@ checkState(registration == null, "Change processor started already"); final WhiteboardExecutor executor = new WhiteboardExecutor(); executor.start(whiteboard); - final BackgroundObserver observer = createObserver(executor); + final FilteringObserver filteringObserver = createObserver(executor); listenerId = COUNTER.incrementAndGet() + ""; Map attrs = ImmutableMap.of(LISTENER_ID, listenerId); String name = tracker.toString(); registration = new CompositeRegistration( - registerObserver(whiteboard, observer), + registerObserver(whiteboard, filteringObserver), registerMBean(whiteboard, EventListenerMBean.class, tracker.getListenerMBean(), "EventListener", name, attrs), registerMBean(whiteboard, BackgroundObserverMBean.class, - observer.getMBean(), BackgroundObserverMBean.TYPE, name, attrs), + filteringObserver.getBackgroundObserver().getMBean(), BackgroundObserverMBean.TYPE, name, attrs), + registerMBean(whiteboard, ChangeProcessorMBean.class, + getMBean(), ChangeProcessorMBean.TYPE, name, attrs), //TODO If FilterProvider gets changed later then MBean would need to be // re-registered registerMBean(whiteboard, FilterConfigMBean.class, @@ -207,7 +287,7 @@ new Registration() { @Override public void unregister() { - observer.close(); + filteringObserver.close(); } }, new Registration() { @@ -225,8 +305,9 @@ ); } - private BackgroundObserver createObserver(final WhiteboardExecutor executor) { - return new BackgroundObserver(this, executor, queueLength) { + private FilteringObserver createObserver(final WhiteboardExecutor executor) { + FilteringDispatcher fd = new FilteringDispatcher(this); + BackgroundObserver bo = new BackgroundObserver(fd, executor, queueLength) { private volatile long delay; private volatile boolean blocking; @@ -287,7 +368,43 @@ } } + + @Override + public String toString() { + return "Prefiltering BackgroundObserver for "+ChangeProcessor.this; + } }; + return new FilteringObserver(bo, new Filter() { + + @Override + public boolean excludes(NodeState root, CommitInfo info) { + if (PREFILTERING_TESTMODE) { + // then we don't prefilter but only test later + prefilterSkipCount++; + return false; + } + final FilterResult filterResult = evalPrefilter(root, info, getChangeSet(info)); + switch (filterResult) { + case PREFILTERING_SKIPPED: { + prefilterSkipCount++; + return false; + } + case EXCLUDE: { + prefilterExcludeCount++; + return true; + } + case INCLUDE: { + prefilterIncludeCount++; + return false; + } + default: { + LOG.info("isExcluded: unknown/unsupported filter result: " + filterResult); + prefilterSkipCount++; + return false; + } + } + } + }); } private final Monitor runningMonitor = new Monitor(); @@ -339,16 +456,50 @@ } } + /** + * Utility method that extracts the ChangeSet from a CommitInfo if possible. + * @param info + * @return + */ + public static ChangeSet getChangeSet(CommitInfo info) { + if (info == null) { + return null; + } + CommitContext context = (CommitContext) info.getInfo().get(CommitContext.NAME); + if (context == null) { + return null; + } + return (ChangeSet) context.get(COMMIT_CONTEXT_OBSERVATION_CHANGESET); + } + @Override - public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) { - if (previousRoot != null) { + public void contentChanged(@Nonnull NodeState before, + @Nonnull NodeState after, + @Nullable CommitInfo info) { + FilterResult prefilterTestResult = null; + if (PREFILTERING_TESTMODE) { + // OAK-4908 test mode: when the ChangeCollectorProvider is enabled + // there is the option to have the ChangeProcessors run in so-called + // 'test mode'. In this test mode the prefiltering is not applied, + // but instead verified if it *would have prefiltered correctly*. + // that test is therefore done at dequeue-time, hence in + // contentChanged + // TODO: remove this testing mechanism after a while try { + prefilterTestResult = evalPrefilter(after, info, getChangeSet(info)); + } catch (Exception e) { + LOG.warn("contentChanged: exception in wouldBeExcludedCommit: " + e, e); + } + } + if (before != null) { + try { long start = PERF_LOGGER.start(); FilterProvider provider = filterProvider.get(); + boolean onEventInvoked = false; // FIXME don't rely on toString for session id if (provider.includeCommit(contentSession.toString(), info)) { - EventFilter filter = provider.getFilter(previousRoot, root); - EventIterator events = new EventQueue(namePathMapper, info, previousRoot, root, + EventFilter filter = provider.getFilter(before, after); + EventIterator events = new EventQueue(namePathMapper, info, before, after, provider.getSubTrees(), Filters.all(filter, VISIBLE_FILTER), provider.getEventAggregator()); @@ -361,6 +512,7 @@ } try { CountingIterator countingEvents = new CountingIterator(events); + onEventInvoked = true; eventListener.onEvent(countingEvents); countingEvents.updateCounters(eventCount, eventDuration); } finally { @@ -371,14 +523,33 @@ } } } + if (prefilterTestResult != null) { + // OAK-4908 test mode + if (prefilterTestResult == FilterResult.EXCLUDE && onEventInvoked) { + // this is not ok, an event would have gotten + // excluded-by-prefiltering even though + // it actually got an event. + LOG.warn("contentChanged: delivering event which would have been prefiltered, " + + "info={}, this={}, listener={}", info, this, eventListener); + } else if (prefilterTestResult == FilterResult.INCLUDE && !onEventInvoked && info != null + && info != CommitInfo.EMPTY) { + // this can occur arbitrarily frequent. as prefiltering + // is not perfect, it can + // have false negatives - ie it can include even though + // no event is then created + // hence we can only really log at debug here + LOG.debug( + "contentChanged: no event to deliver but not prefiltered, info={}, this={}, listener={}", + info, this, eventListener); + } + } PERF_LOGGER.end(start, 100, "Generated events (before: {}, after: {})", - previousRoot, root); + before, after); } catch (Exception e) { LOG.warn("Error while dispatching observation events for " + tracker, e); } } - previousRoot = root; } private static class CountingIterator implements EventIterator { @@ -490,4 +661,45 @@ + ", commitRateLimiter=" + commitRateLimiter + ", running=" + running.isSatisfied() + "]"; } + + /** + * Evaluate the prefilter for a given commit. + * @param changeSet + * + * @return a FilterResult indicating either inclusion, exclusion or + * inclusion-due-to-skipping. The latter is used to reflect + * prefilter evaluation better in statistics (as it could also have + * been reported just as include) + */ + private FilterResult evalPrefilter(NodeState root, CommitInfo info, ChangeSet changeSet) { + if (info == null) { + return FilterResult.PREFILTERING_SKIPPED; + } + if (root == null) { + // likely only occurs at startup + // we can't do any diffing etc, so just not exclude it + return FilterResult.PREFILTERING_SKIPPED; + } + + final FilterProvider fp = filterProvider.get(); + // FIXME don't rely on toString for session id + if (!fp.includeCommit(contentSession.toString(), info)) { + // 'classic' (and cheap pre-) filtering + return FilterResult.EXCLUDE; + } + if (changeSet == null) { + // then can't do any prefiltering since it was not + // able to complete the sets (within the given boundaries) + // (this corresponds to a large commit, which thus can't + // go through prefiltering) + return FilterResult.PREFILTERING_SKIPPED; + } + + final ChangeSetFilter prefilter = fp; + if (prefilter.excludes(changeSet)) { + return FilterResult.EXCLUDE; + } else { + return FilterResult.INCLUDE; + } + } } Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java =================================================================== --- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java (nonexistent) +++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java (working copy) @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.jcr.observation; + +public interface ChangeProcessorMBean { + String TYPE = "ChangeProcessorStats"; + + /** Returns the number of commits that were excluded by the prefiltering mechanism */ + int getPrefilterExcludeCount(); + + /** Returns the number of commits that were included by the prefiltering mechanism */ + int getPrefilterIncludeCount(); + + /** Returns the number of commits that skipped prefiltering, thus got included */ + int getPrefilterSkipCount(); +} Property changes on: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java =================================================================== --- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java (revision 1768549) +++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java (working copy) @@ -84,6 +84,12 @@ referenceInterface = BackgroundObserverMBean.class, policy = ReferencePolicy.DYNAMIC, cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE), + @Reference(name = "changeProcessorMBean", + bind = "bindChangeProcessorMBean", + unbind = "unbindChangeProcessorMBean", + referenceInterface = ChangeProcessorMBean.class, + policy = ReferencePolicy.DYNAMIC, + cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE), @Reference(name = "filterConfigMBean", bind = "bindFilterConfigMBean", unbind = "unbindFilterConfigMBean", @@ -96,6 +102,7 @@ private final AtomicInteger observerCount = new AtomicInteger(); private final Map eventListeners = Maps.newConcurrentMap(); private final Map bgObservers = Maps.newConcurrentMap(); + private final Map changeProcessors = Maps.newConcurrentMap(); private final Map filterConfigs = Maps.newConcurrentMap(); private Registration mbeanReg; @@ -201,6 +208,11 @@ m.observerMBean = ef.getValue(); } } + for (Map.Entry ef : changeProcessors.entrySet()){ + if (Objects.equal(getListenerId(ef.getKey()), listenerId)){ + m.changeProcessorMBean = ef.getValue(); + } + } mbeans.add(m); } return mbeans; @@ -249,6 +261,16 @@ } @SuppressWarnings("unused") + protected void bindChangeProcessorMBean(ChangeProcessorMBean mbean, Map config){ + changeProcessors.put(getObjectName(config), mbean); + } + + @SuppressWarnings("unused") + protected void unbindChangeProcessorMBean(ChangeProcessorMBean mbean, Map config){ + changeProcessors.remove(getObjectName(config)); + } + + @SuppressWarnings("unused") protected void bindListenerMBean(EventListenerMBean mbean, Map config){ eventListeners.put(getObjectName(config), mbean); } @@ -280,6 +302,7 @@ private static class ListenerMBeans { EventListenerMBean eventListenerMBean; BackgroundObserverMBean observerMBean; + ChangeProcessorMBean changeProcessorMBean; FilterConfigMBean filterConfigMBean; } @@ -301,6 +324,9 @@ "ratioOfTimeSpentProcessingEvents", "eventConsumerTimeRatio", "queueBacklogMillis", + "prefilterSkips", + "prefilterExcludes", + "prefilterIncludes", "queueSize", "localEventCount", "externalEventCount", @@ -331,6 +357,9 @@ SimpleType.INTEGER, SimpleType.INTEGER, SimpleType.INTEGER, + SimpleType.INTEGER, + SimpleType.INTEGER, + SimpleType.INTEGER, SimpleType.STRING, SimpleType.BOOLEAN, SimpleType.BOOLEAN, @@ -376,6 +405,9 @@ mbeans.eventListenerMBean.getRatioOfTimeSpentProcessingEvents(), mbeans.eventListenerMBean.getEventConsumerTimeRatio(), mbeans.eventListenerMBean.getQueueBacklogMillis(), + mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterSkipCount(), + mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterExcludeCount(), + mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterIncludeCount(), mbeans.observerMBean.getQueueSize(), mbeans.observerMBean.getLocalEventCount(), mbeans.observerMBean.getExternalEventCount(), Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java =================================================================== --- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java (revision 1768549) +++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java (working copy) @@ -512,4 +512,21 @@ return this; } + /** + * A hook called by the ObservationManagerImpl before creating the ChangeSetFilterImpl + * which allows this filter to adjust the includePaths according to its + * enabled flags. + *

+ * This is used to set the includePath to be '/' in case includeAncestorRemove + * is set. The reason for this is that we must catch parent removals and can thus + * not apply the normally applied prefilter paths. + * @param includePaths the set to adjust depending on filter flags + */ + void adjustPrefilterIncludePaths(Set includePaths) { + if (includeAncestorRemove) { + includePaths.clear(); + includePaths.add("/"); + } + } + } Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java =================================================================== --- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (revision 1768549) +++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (working copy) @@ -62,6 +62,7 @@ import org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector; import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider; import org.apache.jackrabbit.oak.plugins.observation.filter.PermissionProviderFactory; +import org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilterImpl; import org.apache.jackrabbit.oak.plugins.observation.filter.Selectors; import org.apache.jackrabbit.oak.spi.commit.Observable; import org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration; @@ -278,6 +279,7 @@ List excludeConditions = createExclusions(filterBuilder, excludedPaths); + final String[] validatedNodeTypeNames = validateNodeTypeNames(nodeTypeName); Selector nodeTypeSelector = Selectors.PARENT; boolean deleteSubtree = true; if (oakEventFilter != null) { @@ -304,7 +306,7 @@ filterBuilder.moveSubtree(), filterBuilder.eventType(eventTypes), filterBuilder.uuid(Selectors.PARENT, uuids), - filterBuilder.nodeType(nodeTypeSelector, validateNodeTypeNames(nodeTypeName)), + filterBuilder.nodeType(nodeTypeSelector, validatedNodeTypeNames), filterBuilder.accessControl(permissionProviderFactory)); if (oakEventFilter != null) { condition = oakEventFilter.wrapMainCondition(condition, filterBuilder, permissionProviderFactory); @@ -319,6 +321,16 @@ ListenerTracker tracker = new WarningListenerTracker( !noExternal, listener, eventTypes, absPath, isDeep, uuids, nodeTypeName, noLocal); + if (oakEventFilter != null) { + oakEventFilter.adjustPrefilterIncludePaths(includePaths); + } + + // OAK-4908 : prefiltering support. here we have explicit yes/no/maybe filtering + // for things like propertyNames/nodeTypes/nodeNames/paths which cannot be + // applied on the full-fledged filterBuilder above but requires an explicit 'prefilter' for that. + filterBuilder.setChangeSetFilter(new ChangeSetFilterImpl(includePaths, isDeep, excludedPaths, null, + validatedNodeTypeNames == null ? null : newHashSet(validatedNodeTypeNames), null)); + addEventListener(listener, tracker, filterBuilder.build()); } Index: oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java =================================================================== --- oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java (revision 1768549) +++ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java (working copy) @@ -436,7 +436,25 @@ observationManager.removeEventListener(listener); } } + + @Test + public void propertyFilter() throws Exception { + Node root = getNode("/"); + ExpectationListener listener = new ExpectationListener(); + observationManager.addEventListener(listener, PROPERTY_ADDED, "/a/b", false, null, null, false); + Node a = root.addNode("a"); + Node b = a.addNode("b"); + listener.expect("/a/b/jcr:primaryType", PROPERTY_ADDED); + listener.expectAdd(b.setProperty("propName", 1)); + root.getSession().save(); + + List missing = listener.getMissing(TIME_OUT, TimeUnit.SECONDS); + assertTrue("Missing events: " + missing, missing.isEmpty()); + List unexpected = listener.getUnexpected(); + assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty()); + } + @Test public void pathFilter() throws Exception { final String path = "/events/only/here"; @@ -1515,7 +1533,7 @@ filter = new JackrabbitEventFilter(); filter.setEventTypes(ALL_EVENTS); - filter = FilterFactory.wrap(filter).withIncludeGlobPaths(TEST_PATH + "/a3/**/y/*"); + filter = FilterFactory.wrap(filter).withIncludeGlobPaths(TEST_PATH + "/a3/**/y"); oManager.addEventListener(listener, filter); cp = oManager.getChangeProcessor(listener); assertNotNull(cp);