Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java (nonexistent)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java (working copy)
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
+
+/**
+ * A ChangeSetFilter is capable of inspecting a ChangeSet
+ * and deciding if the corresponding consumer
+ * (eg EventListener) is possibly interested in it
+ * or definitely not.
+ *
+ * Falsely deciding to include is fine, falsely
+ * deciding to exclude is not.
+ */
+public interface ChangeSetFilter {
+
+ /**
+ * Decides if the commit belonging to the provided
+ * ChangeSet is potentially relevant to the listener
+ * or if it can definitely be excluded.
+ */
+ boolean excludes(ChangeSet changeSet);
+
+}
Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilter.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java (nonexistent)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java (working copy)
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import static java.util.Collections.disjoint;
+import static org.apache.jackrabbit.oak.commons.PathUtils.concat;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
+
+public class ChangeSetFilterImpl implements ChangeSetFilter {
+
+ private final Set rootIncludePaths;
+ private final Set includePathPatterns;
+ private final Set excludePathPatterns;
+ private final Set parentNodeNames;
+ private final Set parentNodeTypes;
+ private final Set propertyNames;
+
+ public ChangeSetFilterImpl(@Nonnull Set includedParentPaths, boolean isDeep, Set excludedParentPaths,
+ Set parentNodeNames, Set parentNodeTypes, Set propertyNames) {
+ this.rootIncludePaths = new HashSet();
+ this.includePathPatterns = new HashSet();
+ for (String aRawIncludePath : includedParentPaths) {
+ final String aGlobbingIncludePath;
+ if (aRawIncludePath.contains("*")) {
+ // then isDeep is not applicable, it is already a glob path
+ aGlobbingIncludePath = aRawIncludePath;
+ } else {
+ aGlobbingIncludePath = !isDeep ? aRawIncludePath : concat(aRawIncludePath, "**");
+ }
+ this.rootIncludePaths.add(aRawIncludePath);
+ this.includePathPatterns.add(asPattern(aGlobbingIncludePath));
+ }
+ this.excludePathPatterns = new HashSet();
+ for (String aRawExcludePath : excludedParentPaths) {
+ this.excludePathPatterns.add(asPattern(concat(aRawExcludePath, "**")));
+ }
+ this.propertyNames = propertyNames == null ? null : new HashSet(propertyNames);
+ this.parentNodeTypes = parentNodeTypes == null ? null : new HashSet(parentNodeTypes);
+ this.parentNodeNames = parentNodeNames == null ? null : new HashSet(parentNodeNames);
+ }
+
+ private Pattern asPattern(String patternWithGlobs) {
+ return Pattern.compile(GlobbingPathHelper.globPathAsRegex(patternWithGlobs));
+ }
+
+ @Override
+ public boolean excludes(ChangeSet changeSet) {
+ final Set parentPaths = new HashSet(changeSet.getParentPaths());
+
+ // first go through excludes to remove those that are explicitly
+ // excluded
+ if (this.excludePathPatterns.size() != 0) {
+ final Iterator it = parentPaths.iterator();
+ while (it.hasNext()) {
+ final String aParentPath = it.next();
+ if (patternsMatch(this.excludePathPatterns, aParentPath)) {
+ // if an exclude pattern matches, remove the parentPath
+ it.remove();
+ }
+ }
+ }
+ // note that cut-off paths are not applied with excludes,
+ // eg if excludePaths contains /var/foo/bar and path contains /var/foo
+ // with a maxPathLevel of 2, that might very well mean that
+ // the actual path would have been /var/foo/bar, but we don't know.
+ // so we cannot exclude it here and thus have a potential false negative
+ // (ie we didn't exclude it in the prefilter)
+
+ // now remainingPaths contains what is not excluded,
+ // then check if it is included
+ boolean included = false;
+ for (String aPath : parentPaths) {
+ // direct set contains is fastest, lets try that first
+ if (this.rootIncludePaths.contains(aPath)) {
+ included = true;
+ break;
+ }
+ if (patternsMatch(this.includePathPatterns, aPath)) {
+ included = true;
+ break;
+ }
+ }
+
+ if (!included) {
+ // well then we can definitely say that this commit is excluded
+ return true;
+ }
+
+ if (this.propertyNames != null && this.propertyNames.size() != 0) {
+ if (disjoint(changeSet.getPropertyNames(), this.propertyNames)) {
+ return true;
+ }
+ // otherwise we have found a match, but one of the
+ // nodeType/nodeNames
+ // could still filter out, so we have to continue...
+ }
+
+ if (this.parentNodeTypes != null && this.parentNodeTypes.size() != 0) {
+ if (disjoint(changeSet.getParentNodeTypes(), this.parentNodeTypes)) {
+ return true;
+ }
+ // otherwise, again, continue
+ }
+
+ if (this.parentNodeNames != null && this.parentNodeNames.size() != 0) {
+ if (disjoint(changeSet.getParentNodeNames(), this.parentNodeNames)) {
+ return true;
+ }
+ }
+
+ // at this stage we haven't found any exclude, so we're likely including
+ return false;
+ }
+
+ private static boolean patternsMatch(Set pathPatterns, String path) {
+ if (path == null) {
+ return false;
+ }
+ for (Pattern pathPattern : pathPatterns) {
+ if (pathPattern.matcher(path).matches()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImpl.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java (revision 1768549)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterBuilder.java (working copy)
@@ -42,6 +42,7 @@
import com.google.common.collect.Iterables;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.plugins.nodetype.TypePredicate;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
import org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector;
import org.apache.jackrabbit.oak.plugins.tree.RootFactory;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -59,6 +60,13 @@
private boolean includeClusterLocal = true;
private final List subTrees = newArrayList();
private Condition condition = includeAll();
+ private ChangeSetFilter changeSetFilter = new ChangeSetFilter() {
+
+ @Override
+ public boolean excludes(ChangeSet changeSet) {
+ return false;
+ }
+ };
private EventAggregator aggregator;
@@ -66,6 +74,12 @@
@Nonnull
EventFilter createFilter(@Nonnull NodeState before, @Nonnull NodeState after);
}
+
+ @Nonnull
+ public FilterBuilder setChangeSetFilter(@Nonnull ChangeSetFilter changeSetFilter) {
+ this.changeSetFilter = changeSetFilter;
+ return this;
+ }
/**
* Adds a path to the set of paths whose subtrees include all events of
@@ -380,6 +394,7 @@
final EventAggregator aggregator = FilterBuilder.this.aggregator;
final Iterable subTrees = FilterBuilder.this.getSubTrees();
final Condition condition = FilterBuilder.this.condition;
+ final ChangeSetFilter changeSetFilter = FilterBuilder.this.changeSetFilter;
@Override
public boolean includeCommit(@Nonnull String sessionId, @CheckForNull CommitInfo info) {
@@ -417,6 +432,11 @@
public EventAggregator getEventAggregator() {
return aggregator;
}
+
+ @Override
+ public boolean excludes(ChangeSet changeSet) {
+ return changeSetFilter.excludes(changeSet);
+ }
};
}
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java (revision 1768549)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/filter/FilterProvider.java (working copy)
@@ -28,8 +28,11 @@
/**
* Instance of this class provide a {@link EventFilter} for observation
* events and a filter for commits.
+ *
+ * In order to support OAK-4908 a FilterProvider
+ * extends ChangeSetFilter
*/
-public interface FilterProvider {
+public interface FilterProvider extends ChangeSetFilter {
/**
* Filter whole commits. Only commits for which this method returns
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (revision 1768549)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (working copy)
@@ -30,6 +30,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -313,4 +314,28 @@
private static Logger getLogger(@Nonnull Observer observer) {
return LoggerFactory.getLogger(checkNotNull(observer).getClass());
}
+
+
+ /** FOR TESTING ONLY
+ * @throws InterruptedException **/
+ boolean waitUntilStopped(int timeout, TimeUnit unit) throws InterruptedException {
+ long done = System.currentTimeMillis() + unit.toMillis(timeout);
+ boolean added;
+ synchronized(this) {
+ added = queue.offer(STOP);
+ currentTask.onComplete(completionHandler);
+ }
+ while(done > System.currentTimeMillis()) {
+ synchronized(this) {
+ if (!added) {
+ added = queue.offer(STOP);
+ }
+ if (queue.size() == 0 || (queue.size() == 1 && queue.peek() == STOP)) {
+ return true;
+ }
+ wait(1);
+ }
+ }
+ return false;
+ }
}
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java (nonexistent)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java (working copy)
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.observation.filter;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSetBuilder;
+import org.junit.Test;
+
+public class ChangeSetFilterImplTest {
+
+ /** shortcut for creating a set of strings */
+ private Set s(String... entries) {
+ return new HashSet(Arrays.asList(entries));
+ }
+
+ private ChangeSet newChangeSet(int maxPathDepth, Set parentPaths,
+ Set parentNodeNames,
+ Set parentNodeTypes,
+ Set propertyNames) {
+ ChangeSetBuilder changeSetBuilder = new ChangeSetBuilder(Integer.MAX_VALUE, maxPathDepth);
+ changeSetBuilder.getParentPaths().addAll(parentPaths);
+ changeSetBuilder.getParentNodeNames().addAll(parentNodeNames);
+ changeSetBuilder.getParentNodeTypes().addAll(parentNodeTypes);
+ changeSetBuilder.getPropertyNames().addAll(propertyNames);
+ return changeSetBuilder.build();
+ }
+
+ @Test
+ public void testIsDeepFalse() throws Exception {
+ ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), false, s("/excluded"), s(), s(), s());
+
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/child1", "/child2"), s("child1", "child2"), s(), s())));
+ assertFalse(prefilter.excludes(newChangeSet(5, s("/", "/child2"), s("child2"), s(), s())));
+ }
+
+ @Test
+ public void testParentPathsIncludeExclude() throws Exception {
+ ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), true, s("/excluded"), s(), s(), s());
+ assertFalse(prefilter.excludes(newChangeSet(5, s("/a", "/b"), s("a", "b"), s(), s())));
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/excluded/foo", "/excluded/bar"), s("foo", "bar"), s(), s())));
+
+ prefilter = new ChangeSetFilterImpl(s("/included"), true, s("/excluded"), s(), s(), s());
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/a", "/b"), s(), s(), s())));
+ assertFalse(prefilter.excludes(newChangeSet(5, s("/included/a", "/included/b"), s(), s(), s())));
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/excluded/foo", "/excluded/bar"), s(), s(), s())));
+
+ prefilter = new ChangeSetFilterImpl(s("/foo/**/included/**"), true /*ignored for globs */, s("/excluded"), s(), s(), s());
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/a", "/b"), s(), s(), s())));
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/included/a", "/included/b"), s(), s(), s())));
+ assertFalse(prefilter.excludes(newChangeSet(5, s("/foo/included/a"), s(), s(), s())));
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/included/b"), s(), s(), s())));
+ assertFalse(prefilter.excludes(newChangeSet(5, s("/foo/bar/included/a", "/included/b"), s(), s(), s())));
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/excluded/foo", "/excluded/bar"), s(), s(), s())));
+
+ prefilter = new ChangeSetFilterImpl(s("/main/**/included"), true, s("/main/excluded"), s(), s(), s());
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/main", "/main/foo"), s(), s(), s())));
+ assertFalse(prefilter.excludes(newChangeSet(5, s("/main/included", "/main/excluded"), s(), s(), s())));
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded/included", "/main/excluded"), s(), s(), s())));
+
+ prefilter = new ChangeSetFilterImpl(s("/main/included/**"), true, s("/main/excluded"), s(), s(), s());
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/main", "/main/foo"), s(), s(), s())));
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded"), s(), s(), s())));
+ assertFalse(prefilter.excludes(newChangeSet(5, s("/main/included"), s(), s(), s())));
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded/included", "/main/excluded"), s(), s(), s())));
+
+ prefilter = new ChangeSetFilterImpl(s("/main/inc-*/**"), true, s("/main/excluded"), s(), s(), s());
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/main", "/main/foo"), s(), s(), s())));
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded"), s(), s(), s())));
+ assertFalse(prefilter.excludes(newChangeSet(5, s("/main/inc-luded"), s(), s(), s())));
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/main/excluded/included", "/main/excluded"), s(), s(), s())));
+ }
+
+ @Test
+ public void testParentNodeNames() throws Exception {
+ ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), true, s(), s("foo", "bar"), s(), s());
+ assertFalse(prefilter.excludes(newChangeSet(5, s("/a/foo", "/b"), s("foo", "b"), s(), s())));
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/a/zoo", "/b"), s("zoo", "b"), s(), s())));
+ assertFalse(prefilter.excludes(newChangeSet(5, s("/a/zoo", "/bar"), s("zoo", "bar"), s(), s())));
+ }
+
+ @Test
+ public void testParentNodeTypes() throws Exception {
+ ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), true, s(), s(), s("nt:folder"), s());
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/a"), s("a"), s("nt:unstructured"), s())));
+ assertFalse(prefilter.excludes(newChangeSet(5, s("/a"), s("a"), s("nt:folder"), s())));
+ }
+
+ @Test
+ public void testPropertyNames() throws Exception {
+ ChangeSetFilterImpl prefilter = new ChangeSetFilterImpl(s("/"), true, s(), s(), s(), s("jcr:data"));
+ assertTrue(prefilter.excludes(newChangeSet(5, s("/a"), s("a"), s(), s("myProperty"))));
+ assertFalse(prefilter.excludes(newChangeSet(5, s("/a"), s("a"), s(), s("jcr:data"))));
+ }
+
+}
Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/observation/filter/ChangeSetFilterImplTest.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java (revision 1768549)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.java (working copy)
@@ -20,13 +20,19 @@
package org.apache.jackrabbit.oak.spi.commit;
import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import java.io.Closeable;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -33,11 +39,19 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import com.google.common.collect.Lists;
import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.observation.Filter;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.After;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
+import junit.framework.AssertionFailedError;
+
public class BackgroundObserverTest {
private static final CommitInfo COMMIT_INFO = new CommitInfo("no-session", null);
public static final int CHANGE_COUNT = 1024;
@@ -44,10 +58,12 @@
private final List assertions = Lists.newArrayList();
private CountDownLatch doneCounter;
+ private final List closeables = Lists.newArrayList();
/**
* Assert that each observer of many running concurrently sees the same
- * linearly sequence of commits (i.e. sees the commits in the correct order).
+ * linearly sequence of commits (i.e. sees the commits in the correct
+ * order).
*/
@Test
public void concurrentObservers() throws InterruptedException {
@@ -99,7 +115,7 @@
return new BackgroundObserver(new Observer() {
// Need synchronised list here to maintain correct memory barrier
// when this is passed on to done(List)
- final List assertions = Collections.synchronizedList(Lists.newArrayList());
+ final List assertions = Collections.synchronizedList(Lists. newArrayList());
volatile NodeState previous;
@Override
@@ -125,5 +141,288 @@
}
}, executor, queueLength);
}
+
+ class MyFilter implements Filter {
+ private boolean excludeNext;
+
+ void excludeNext(boolean excludeNext) {
+ this.excludeNext = excludeNext;
+ }
+
+ @Override
+ public boolean excludes(NodeState root, CommitInfo info) {
+ final boolean excludes = excludeNext;
+ excludeNext = false;
+ return excludes;
+ }
+
+ }
+
+ class Recorder implements FilteringAwareObserver {
+
+ List includedChanges = new LinkedList();
+ private boolean pause;
+ private boolean pausing;
+
+ public Recorder() {
+ }
+
+ @Override
+ public void contentChanged(NodeState before, NodeState after, CommitInfo info) {
+ includedChanges.add(new Pair(before, after));
+ maybePause();
+ }
+
+ public void maybePause() {
+ synchronized (this) {
+ try {
+ while (pause) {
+ pausing = true;
+ this.notifyAll();
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ // should not happen
+ }
+ }
+ } finally {
+ pausing = false;
+ this.notifyAll();
+ }
+ }
+ }
+
+ public synchronized void pause() {
+ this.pause = true;
+ }
+
+ public synchronized void unpause() {
+ this.pause = false;
+ this.notifyAll();
+ }
+
+ public boolean waitForPausing(int timeout, TimeUnit unit) throws InterruptedException {
+ final long done = System.currentTimeMillis() + unit.toMillis(timeout);
+ synchronized (this) {
+ while (!pausing && done > System.currentTimeMillis()) {
+ this.wait();
+ }
+ return pausing;
+ }
+ }
+
+ public boolean waitForUnpausing(int timeout, TimeUnit unit) throws InterruptedException {
+ final long done = System.currentTimeMillis() + unit.toMillis(timeout);
+ synchronized (this) {
+ while (pausing && done > System.currentTimeMillis()) {
+ this.wait();
+ }
+ return !pausing;
+ }
+ }
+
+ }
+
+ class Pair {
+ private final NodeState before;
+ private final NodeState after;
+
+ Pair(NodeState before, NodeState after) {
+ this.before = before;
+ this.after = after;
+ }
+
+ @Override
+ public String toString() {
+ return "Pair(before=" + before + ", after=" + after + ")";
+ }
+ }
+
+ class NodeStateGenerator {
+ Random r = new Random(1232131); // seed: repeatable tests
+ NodeBuilder builder = EMPTY_NODE.builder();
+
+ NodeState next() {
+ builder.setProperty("p", r.nextInt());
+ NodeState result = builder.getNodeState();
+ builder = result.builder();
+ return result;
+ }
+ }
+
+ private void assertMatches(String msg, List expected, List actual) {
+ assertEquals("size mismatch. msg=" + msg, expected.size(), actual.size());
+ for (int i = 0; i < expected.size(); i++) {
+ assertSame("mismatch of before at pos=" + i + ", msg=" + msg, expected.get(i).before, actual.get(i).before);
+ assertSame("mismatch of after at pos=" + i + ", msg=" + msg, expected.get(i).after, actual.get(i).after);
+ }
+ }
+
+ @After
+ public void shutDown() throws Exception {
+ for (Closeable closeable : closeables) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ throw new AssertionFailedError(e.getMessage());
+ }
+ }
+ }
+
+ @Test
+ public void testExcludedAllCommits() throws Exception {
+ MyFilter filter = new MyFilter();
+ Recorder recorder = new Recorder();
+ ExecutorService executor = newSingleThreadExecutor();
+ FilteringObserver fo = new FilteringObserver(executor, 5, filter, recorder);
+ closeables.add(fo);
+ List expected = new LinkedList();
+ NodeStateGenerator generator = new NodeStateGenerator();
+ NodeState first = generator.next();
+ expected.add(new Pair(null, first));
+ fo.contentChanged(first, CommitInfo.EMPTY);
+ for (int i = 0; i < 100000; i++) {
+ filter.excludeNext(true);
+ fo.contentChanged(generator.next(), CommitInfo.EMPTY);
+ }
+ assertTrue("testExcludedAllCommits", fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
+ assertMatches("testExcludedAllCommits", expected, recorder.includedChanges);
+ }
+
+ @Test
+ public void testNoExcludedCommits() throws Exception {
+ MyFilter filter = new MyFilter();
+ Recorder recorder = new Recorder();
+ ExecutorService executor = newSingleThreadExecutor();
+ FilteringObserver fo = new FilteringObserver(executor, 10002, filter, recorder);
+ closeables.add(fo);
+ List expected = new LinkedList();
+ NodeStateGenerator generator = new NodeStateGenerator();
+ NodeState first = generator.next();
+ expected.add(new Pair(null, first));
+ fo.contentChanged(first, CommitInfo.EMPTY);
+ NodeState previous = first;
+ for (int i = 0; i < 10000; i++) {
+ filter.excludeNext(false);
+ NodeState next = generator.next();
+ expected.add(new Pair(previous, next));
+ previous = next;
+ fo.contentChanged(next, CommitInfo.EMPTY);
+ }
+ assertTrue("testNoExcludedCommits", fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
+ assertMatches("testNoExcludedCommits", expected, recorder.includedChanges);
+ }
+
+ @Test
+ public void testExcludeCommitsWithFullQueue() throws Exception {
+ MyFilter filter = new MyFilter();
+ Recorder recorder = new Recorder();
+ ExecutorService executor = newSingleThreadExecutor();
+ FilteringObserver fo = new FilteringObserver(executor, 2, filter, recorder);
+ closeables.add(fo);
+ List expected = new LinkedList();
+ NodeStateGenerator generator = new NodeStateGenerator();
+ recorder.pause();
+
+ // the first one will directly go to the recorder
+ NodeState firstIncluded = generator.next();
+ expected.add(new Pair(null, firstIncluded));
+ fo.contentChanged(firstIncluded, CommitInfo.EMPTY);
+
+ assertTrue("observer did not get called (yet?)", recorder.waitForPausing(5, TimeUnit.SECONDS));
+
+ // this one will be queued as #1
+ NodeState secondIncluded = generator.next();
+ expected.add(new Pair(firstIncluded, secondIncluded));
+ fo.contentChanged(secondIncluded, CommitInfo.EMPTY);
+
+ // this one will be queued as #2
+ NodeState thirdIncluded = generator.next();
+ expected.add(new Pair(secondIncluded, thirdIncluded));
+ fo.contentChanged(thirdIncluded, CommitInfo.EMPTY);
+
+ // this one will cause the queue to 'overflow' (full==true)
+ NodeState forthQueueFull = generator.next();
+ // not adding to expected, as this one ends up in the overflow element
+ fo.contentChanged(forthQueueFull, CommitInfo.EMPTY);
+
+ NodeState next;
+ // exclude when queue is full
+ filter.excludeNext(true);
+ next = generator.next();
+ // if excluded==true and full, hence not adding to expected
+ fo.contentChanged(next, CommitInfo.EMPTY);
+ // include after an exclude when queue was full
+ // => this is not supported. when the queue
+ filter.excludeNext(false);
+ next = generator.next();
+ // excluded==false BUT queue full, hence not adding to expected
+ fo.contentChanged(next, CommitInfo.EMPTY);
+ // let recorder continue
+ recorder.unpause();
+
+ recorder.waitForUnpausing(5, TimeUnit.SECONDS);
+ Thread.sleep(1000); // wait for 1 element to be dequeued at least
+ // exclude when queue is no longer full
+ filter.excludeNext(true);
+ NodeState seventhAfterQueueFull = generator.next();
+ // with the introduction of the FilteringAwareObserver this
+ // 'seventhAfterQueueFull' root will not be forwarded
+ // to the BackgroundObserver - thus entirely filtered
+
+ fo.contentChanged(seventhAfterQueueFull, CommitInfo.EMPTY);
+
+ // but with the introduction of FilteringAwareObserver the delivery
+ // only happens with non-filtered items, so adding yet another one now
+ filter.excludeNext(false);
+ NodeState last = generator.next();
+ // while above the "seventhAfterQueueFull" DOES get filtered, the next contentChange
+ // triggers the release of the 'queue full overflow element' (with commitInfo==null)
+ // and that we must add as expected()
+ expected.add(new Pair(thirdIncluded, seventhAfterQueueFull)); // commitInfo == null
+ expected.add(new Pair(seventhAfterQueueFull, last));
+ fo.contentChanged(last, CommitInfo.EMPTY);
+
+ assertTrue("testExcludeCommitsWithFullQueue", fo.getBackgroundObserver().waitUntilStopped(10, TimeUnit.SECONDS));
+ assertMatches("testExcludeCommitsWithFullQueue", expected, recorder.includedChanges);
+ }
+
+ @Test
+ public void testExcludeSomeCommits() throws Exception {
+ ExecutorService executor = newSingleThreadExecutor();
+ for (int i = 0; i < 100; i++) {
+ doTestExcludeSomeCommits(i, executor);
+ }
+ for (int i = 100; i < 10000; i += 50) {
+ doTestExcludeSomeCommits(i, executor);
+ }
+ }
+
+ private void doTestExcludeSomeCommits(int cnt, Executor executor) throws Exception {
+ MyFilter filter = new MyFilter();
+ Recorder recorder = new Recorder();
+ FilteringObserver fo = new FilteringObserver(executor, cnt + 2, filter, recorder);
+ closeables.add(fo);
+ List expected = new LinkedList();
+ Random r = new Random(2343242); // seed: repeatable tests
+ NodeStateGenerator generator = new NodeStateGenerator();
+ NodeState first = generator.next();
+ expected.add(new Pair(null, first));
+ fo.contentChanged(first, CommitInfo.EMPTY);
+ NodeState previous = first;
+ for (int i = 0; i < cnt; i++) {
+ boolean excludeNext = r.nextInt(100) < 90;
+ filter.excludeNext(excludeNext);
+ NodeState next = generator.next();
+ if (!excludeNext) {
+ expected.add(new Pair(previous, next));
+ }
+ previous = next;
+ fo.contentChanged(next, CommitInfo.EMPTY);
+ }
+ assertTrue("cnt=" + cnt, fo.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
+ assertMatches("cnt=" + cnt, expected, recorder.includedChanges);
+ }
+
}
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java (nonexistent)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java (working copy)
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.plugins.observation.Filter;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.junit.Test;
+
+public class PrefilteringBackgroundObserverTest {
+
+ private final boolean EXCLUDED = true;
+ private final boolean INCLUDED = false;
+
+ private List runnableQ;
+ private ExecutorService executor;
+ private CompositeObserver compositeObserver;
+ private List received;
+ private FilteringObserver filteringObserver;
+ private CommitInfo includingCommitInfo = new CommitInfo("includingSession", CommitInfo.OAK_UNKNOWN);
+ private CommitInfo excludingCommitInfo = new CommitInfo("excludingSession", CommitInfo.OAK_UNKNOWN);
+ private int resetCallCnt;
+
+ public void init(int queueLength) throws Exception {
+ runnableQ = new LinkedList();
+ executor = new EnqueuingExecutorService(runnableQ);
+ compositeObserver = new CompositeObserver();
+ received = new LinkedList();
+ filteringObserver = new FilteringObserver(executor, queueLength, new Filter() {
+
+ @Override
+ public boolean excludes(NodeState root, CommitInfo info) {
+ if (info == includingCommitInfo) {
+ return false;
+ } else if (info == excludingCommitInfo) {
+ return true;
+ } else if (info == null) {
+ return false;
+ }
+ throw new IllegalStateException("only supporting include or exclude");
+ }
+ }, new FilteringAwareObserver() {
+
+ NodeState previous;
+
+ @Override
+ public void contentChanged(NodeState before, NodeState after, CommitInfo info) {
+ received.add(new ContentChanged(after, info));
+ if (previous !=null && previous != before) {
+ resetCallCnt++;
+ }
+ previous = after;
+ }
+ });
+ compositeObserver.addObserver(filteringObserver);
+ }
+
+ private final class EnqueuingExecutorService extends AbstractExecutorService {
+ private final List runnableQ;
+
+ private EnqueuingExecutorService(List runnableQ) {
+ this.runnableQ = runnableQ;
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ runnableQ.add(command);
+ }
+
+ @Override
+ public List shutdownNow() {
+ throw new IllegalStateException("nyi");
+ }
+
+ @Override
+ public void shutdown() {
+ throw new IllegalStateException("nyi");
+ }
+
+ @Override
+ public boolean isTerminated() {
+ throw new IllegalStateException("nyi");
+ }
+
+ @Override
+ public boolean isShutdown() {
+ throw new IllegalStateException("nyi");
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ throw new IllegalStateException("nyi");
+ }
+ }
+
+ class ContentChanged {
+ NodeState root;
+ CommitInfo info;
+ ContentChanged(NodeState root, CommitInfo info) {
+ this.root = root;
+ this.info = info;
+ }
+ }
+
+ private static void executeRunnables(final List runnableQ, int num) {
+ for(int i=0; i(runnableQ)) {
+ runnable.run();
+ }
+ }
+ }
+
+ private static NodeState p(int k) {
+ return EMPTY_NODE.builder().setProperty("p", k).getNodeState();
+ }
+
+ @Test
+ public void testFlipping() throws Exception {
+ final int queueLength = 2000;
+ init(queueLength);
+
+ // initialize observer with an initial contentChanged
+ // (see ChangeDispatcher#addObserver)
+ {
+ compositeObserver.contentChanged(p(-1), null);
+ }
+ // Part 1 : first run with filtersEvaluatedMapWithEmptyObservers - empty or null shouldn't matter, it's excluded in both cases
+ for (int k = 0; k < 1000; k++) {
+ CommitInfo info;
+ if (k%2==1) {
+ info = includingCommitInfo;
+ } else {
+ info = excludingCommitInfo;
+ }
+ final NodeState p = p(k);
+ compositeObserver.contentChanged(p, info);
+ if (k%10 == 0) {
+ executeRunnables(runnableQ, 10);
+ }
+ }
+ executeRunnables(runnableQ, 10);
+
+ assertEquals(501, received.size());
+ assertEquals(500, resetCallCnt);
+
+ // Part 2 : run with filtersEvaluatedMapWithNullObservers - empty or null shouldn't matter, it's excluded in both cases
+ received.clear();
+ resetCallCnt = 0;
+ for (int k = 0; k < 1000; k++) {
+ CommitInfo info;
+ if (k%2==1) {
+ info = includingCommitInfo;
+ } else {
+ info = excludingCommitInfo;
+ }
+ final NodeState p = p(k);
+ compositeObserver.contentChanged(p, info);
+ if (k%10 == 0) {
+ executeRunnables(runnableQ, 10);
+ }
+ }
+ executeRunnables(runnableQ, 10);
+
+ assertEquals(500, received.size());
+ assertEquals(500, resetCallCnt);
+
+ // Part 3 : unlike the method name suggests, this variant tests with the filter disabled, so should receive all events normally
+ received.clear();
+ resetCallCnt = 0;
+ for (int k = 0; k < 1000; k++) {
+ CommitInfo info;
+ if (k%2==1) {
+ info = includingCommitInfo;
+ } else {
+ info = includingCommitInfo;
+ }
+ final NodeState p = p(k);
+ compositeObserver.contentChanged(p, info);
+ if (k%10 == 0) {
+ executeRunnables(runnableQ, 10);
+ }
+ }
+ executeRunnables(runnableQ, 10);
+
+ assertEquals(1000, received.size());
+ assertEquals(0, resetCallCnt);
+ }
+
+ @Test
+ public void testFlipping2() throws Exception {
+ doTestFullQueue(6,
+ new TestPattern(INCLUDED, 1, true, 1, 0),
+ new TestPattern(EXCLUDED, 5, true, 0, 0),
+ new TestPattern(INCLUDED, 2, true, 2, 1),
+ new TestPattern(EXCLUDED, 1, true, 0, 0),
+ new TestPattern(INCLUDED, 2, true, 2, 1));
+ }
+
+ @Test
+ public void testQueueNotFull() throws Exception {
+ doTestFullQueue(20,
+ // start: empty queue
+ new TestPattern(EXCLUDED, 1000, false, 0, 0),
+ // here: still empty, just the previousRoot is set to remember above NOOPs
+ new TestPattern(INCLUDED, 5, false, 0, 0),
+ // here: 5 changes are in the queue, the queue fits 20, way to go
+ new TestPattern(EXCLUDED, 500, false, 0, 0),
+ // still 5 in the queue
+ new TestPattern(INCLUDED, 5, false, 0, 0),
+ // now we added 2, queue still not full
+ new TestPattern(EXCLUDED, 0 /* only flush*/, true, 10, 2)
+ );
+ }
+
+ @Test
+ public void testIncludeOnQueueFull() throws Exception {
+ doTestFullQueue(7,
+ // start: empty queue
+ new TestPattern(EXCLUDED, 1000, false, 0, 0, 0, 0),
+ // here: still empty, just the previousRoot is set to remember above NOOPs
+ new TestPattern(INCLUDED, 5, false, 0, 0, 0, 6),
+ // here: 1 init and 5 changes are in the queue, the queue fits 7, so queue is almost full
+ new TestPattern(EXCLUDED, 500, false, 0, 0, 6, 6),
+ // still 6 in the queue, of 7
+ new TestPattern(INCLUDED, 5, false, 0, 0, 6, 7),
+ // now we added 2 (one NOOP and one of those 5), so the queue got full (==7)
+ new TestPattern(EXCLUDED, 0 /* only flush*/, true, 5, 1, 7, 0)
+ );
+ }
+
+ @Test
+ public void testExcludeOnQueueFull2() throws Exception {
+ doTestFullQueue(1,
+ // start: empty queue
+ new TestPattern(INCLUDED, 10, false, 0, 0),
+ new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0),
+ new TestPattern(INCLUDED, 10, false, 0, 0),
+ new TestPattern(EXCLUDED, 10, false, 0, 0),
+ new TestPattern(INCLUDED, 10, false, 0, 0),
+ new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0),
+ new TestPattern(INCLUDED, 10, false, 0, 0),
+ new TestPattern(EXCLUDED, 10, false, 0, 0),
+ new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0),
+ new TestPattern(EXCLUDED, 10, false, 0, 0),
+ new TestPattern(INCLUDED, 10, false, 0, 0),
+ new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0));
+ }
+
+ @Test
+ public void testExcludeOnQueueFull1() throws Exception {
+ doTestFullQueue(4,
+ // start: empty queue
+ new TestPattern(EXCLUDED, 1, false, 0, 0, 0, 0),
+ // here: still empty, just the previousRoot is set to remember above NOOP
+ new TestPattern(INCLUDED, 3, false, 0, 0, 0, 4),
+ // here: 3 changes are in the queue, the queue fits 3, so it just got full now
+ new TestPattern(EXCLUDED, 1, false, 0, 0, 4, 4),
+ // still full but it's ignored, so doesn't have any queue length effect
+ new TestPattern(INCLUDED, 3, false, 0, 0, 4, 4),
+ // adding 3 will not work, it will result in an overflow entry
+ new TestPattern(EXCLUDED, 0 /* only flush*/, true, 3, 1, 4, 0),
+ new TestPattern(INCLUDED, 1, false, 0, 0, 0, 1),
+ new TestPattern(EXCLUDED, 0 /* only flush*/, true, 1, 0, 1, 0)
+ );
+ }
+
+ class TestPattern {
+ final boolean flush;
+ final boolean excluded;
+ final int numEvents;
+ final int expectedNumEvents;
+ final int expectedNumResetCalls;
+ private int expectedQueueSizeAtStart = -1;
+ private int expectedQueueSizeAtEnd = -1;
+ TestPattern(boolean excluded, int numEvents, boolean flush, int expectedNumEvents, int expectedNumResetCalls) {
+ this.flush = flush;
+ this.excluded = excluded;
+ this.numEvents = numEvents;
+ this.expectedNumEvents = expectedNumEvents;
+ this.expectedNumResetCalls = expectedNumResetCalls;
+ }
+ TestPattern(boolean excluded, int numEvents, boolean flush, int expectedNumEvents, int expectedNumResetCalls, int expectedQueueSizeAtStart, int expectedQueueSizeAtEnd) {
+ this(excluded, numEvents, flush, expectedNumEvents, expectedNumResetCalls);
+ this.expectedQueueSizeAtStart = expectedQueueSizeAtStart;
+ this.expectedQueueSizeAtEnd = expectedQueueSizeAtEnd;
+ }
+
+ @Override
+ public String toString() {
+ return "excluded="+excluded+", numEvents="+numEvents+", flush="+flush+", expectedNumEvents="+expectedNumEvents+", expectedNumResetCalls="+expectedNumResetCalls;
+ }
+ }
+
+ private void doTestFullQueue(int queueLength, TestPattern... testPatterns) throws Exception {
+ init(queueLength);
+
+ // initialize observer with an initial contentChanged
+ // (see ChangeDispatcher#addObserver)
+ {
+ compositeObserver.contentChanged(p(-1), null);
+ }
+ // remove above first event right away
+ executeRunnables(runnableQ, 5);
+ received.clear();
+ resetCallCnt = 0;
+
+ int k = 0;
+ int loopCnt = 0;
+ for (TestPattern testPattern : testPatterns) {
+ k++;
+ if (testPattern.expectedQueueSizeAtStart >= 0) {
+ assertEquals("loopCnt="+loopCnt+", queue size mis-match at start",
+ testPattern.expectedQueueSizeAtStart, filteringObserver.getBackgroundObserver().getMBean().getQueueSize());
+ }
+ for(int i=0; i= 0) {
+ assertEquals("loopCnt="+loopCnt+", queue size mis-match at end",
+ testPattern.expectedQueueSizeAtEnd, filteringObserver.getBackgroundObserver().getMBean().getQueueSize());
+ }
+ }
+ }
+
+}
Property changes on: oak-core/src/test/java/org/apache/jackrabbit/oak/spi/commit/PrefilteringBackgroundObserverTest.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java (revision 1768549)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java (working copy)
@@ -45,6 +45,7 @@
import org.apache.jackrabbit.oak.plugins.name.NamespaceEditorProvider;
import org.apache.jackrabbit.oak.plugins.nodetype.TypeEditorProvider;
import org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
import org.apache.jackrabbit.oak.plugins.version.VersionHook;
import org.apache.jackrabbit.oak.query.QueryEngineSettings;
@@ -120,6 +121,7 @@
with(new NamespaceEditorProvider());
with(new TypeEditorProvider());
with(new ConflictValidatorProvider());
+ with(new ChangeCollectorProvider());
with(new ReferenceEditorProvider());
with(new ReferenceIndexProvider());
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (revision 1768549)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (working copy)
@@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkState;
import static org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_COUNTER;
import static org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.OBSERVATION_EVENT_DURATION;
+import static org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider.COMMIT_CONTEXT_OBSERVATION_CHANGESET;
import static org.apache.jackrabbit.oak.plugins.observation.filter.VisibleFilter.VISIBLE_FILTER;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerObserver;
@@ -37,29 +38,32 @@
import javax.jcr.observation.EventIterator;
import javax.jcr.observation.EventListener;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Monitor;
-import com.google.common.util.concurrent.Monitor.Guard;
import org.apache.jackrabbit.api.jmx.EventListenerMBean;
import org.apache.jackrabbit.commons.observation.ListenerTracker;
import org.apache.jackrabbit.oak.api.ContentSession;
import org.apache.jackrabbit.oak.namepath.NamePathMapper;
+import org.apache.jackrabbit.oak.plugins.observation.ChangeSet;
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
+import org.apache.jackrabbit.oak.plugins.observation.Filter;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringDispatcher;
+import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterConfigMBean;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
+import org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilter;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
+import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
-import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
+import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.StatisticManager;
-import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.TimerStats;
import org.apache.jackrabbit.oak.util.PerfLogger;
import org.apache.jackrabbit.stats.TimeSeriesMax;
@@ -66,6 +70,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Monitor;
+import com.google.common.util.concurrent.Monitor.Guard;
+
/**
* A {@code ChangeProcessor} generates observation {@link javax.jcr.observation.Event}s
* based on a {@link FilterProvider filter} and delivers them to an {@link EventListener}.
@@ -73,11 +81,27 @@
* After instantiation a {@code ChangeProcessor} must be started in order to start
* delivering observation events and stopped to stop doing so.
*/
-class ChangeProcessor implements Observer {
+class ChangeProcessor implements FilteringAwareObserver {
private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
private static final PerfLogger PERF_LOGGER = new PerfLogger(
LoggerFactory.getLogger(ChangeProcessor.class.getName() + ".perf"));
+ private enum FilterResult {
+ /** marks a commit as to be included, ie delivered.
+ * It's okay to falsely mark a commit as included,
+ * since filtering (as part of converting to events)
+ * will be applied at a later stage again. */
+ INCLUDE,
+ /** mark a commit as not of interest to this ChangeProcessor.
+ * Exclusion is definite, ie it's not okay to falsely
+ * mark a commit as excluded */
+ EXCLUDE,
+ /** mark a commit as included but indicate that this
+ * is not a result of prefiltering but that prefiltering
+ * was skipped/not applicable for some reason */
+ PREFILTERING_SKIPPED
+ }
+
/**
* Fill ratio of the revision queue at which commits should be delayed
* (conditional of {@code commitRateLimiter} being non {@code null}).
@@ -89,7 +113,12 @@
* kicks in.
*/
public static final int MAX_DELAY;
-
+
+ /** The test mode can be used to just verify if prefiltering would have
+ * correctly done its job and warn if that's not the case.
+ */
+ private static final boolean PREFILTERING_TESTMODE;
+
// OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using System.properties for now
static {
final String delayThresholdStr = System.getProperty("oak.commitRateLimiter.delayThreshold");
@@ -114,6 +143,18 @@
}
DELAY_THRESHOLD = delayThreshold;
MAX_DELAY = maxDelay;
+
+ final String prefilteringTestModeStr = System.getProperty("oak.observation.prefilteringTestMode");
+ boolean prefilteringTestModeBool = false; // default is enabled
+ try {
+ if (prefilteringTestModeStr != null && prefilteringTestModeStr.length() != 0) {
+ prefilteringTestModeBool = Boolean.parseBoolean(prefilteringTestModeStr);
+ LOG.info(" using oak.observation.prefilteringTestMode = " + prefilteringTestModeBool);
+ }
+ } catch(RuntimeException e) {
+ LOG.warn(" could not parse oak.observation.prefilteringTestMode, using default (" + prefilteringTestModeBool + "): " + e, e);
+ }
+ PREFILTERING_TESTMODE = prefilteringTestModeBool;
}
private static final AtomicInteger COUNTER = new AtomicInteger();
@@ -145,8 +186,22 @@
*/
private CompositeRegistration registration;
- private volatile NodeState previousRoot;
-
+ /**
+ * for statistics: tracks how many times prefiltering excluded a commit
+ */
+ private int prefilterExcludeCount;
+
+ /**
+ * for statistics: tracks how many times prefiltering included a commit
+ */
+ private int prefilterIncludeCount;
+
+ /**
+ * for statistics: tracks how many times prefiltering was ignored (not evaluated at all),
+ * either because it was disabled, queue too small, CommitInfo null or CommitContext null
+ */
+ private int prefilterSkipCount;
+
public ChangeProcessor(
ContentSession contentSession,
NamePathMapper namePathMapper,
@@ -180,6 +235,29 @@
return filterProvider.get();
}
+ @Nonnull
+ public ChangeProcessorMBean getMBean() {
+ return new ChangeProcessorMBean() {
+
+ @Override
+ public int getPrefilterExcludeCount() {
+ return prefilterExcludeCount;
+ }
+
+ @Override
+ public int getPrefilterIncludeCount() {
+ return prefilterIncludeCount;
+ }
+
+ @Override
+ public int getPrefilterSkipCount() {
+ return prefilterSkipCount;
+ }
+
+ };
+ }
+
+
/**
* Start this change processor
* @param whiteboard the whiteboard instance to used for scheduling individual
@@ -190,16 +268,18 @@
checkState(registration == null, "Change processor started already");
final WhiteboardExecutor executor = new WhiteboardExecutor();
executor.start(whiteboard);
- final BackgroundObserver observer = createObserver(executor);
+ final FilteringObserver filteringObserver = createObserver(executor);
listenerId = COUNTER.incrementAndGet() + "";
Map attrs = ImmutableMap.of(LISTENER_ID, listenerId);
String name = tracker.toString();
registration = new CompositeRegistration(
- registerObserver(whiteboard, observer),
+ registerObserver(whiteboard, filteringObserver),
registerMBean(whiteboard, EventListenerMBean.class,
tracker.getListenerMBean(), "EventListener", name, attrs),
registerMBean(whiteboard, BackgroundObserverMBean.class,
- observer.getMBean(), BackgroundObserverMBean.TYPE, name, attrs),
+ filteringObserver.getBackgroundObserver().getMBean(), BackgroundObserverMBean.TYPE, name, attrs),
+ registerMBean(whiteboard, ChangeProcessorMBean.class,
+ getMBean(), ChangeProcessorMBean.TYPE, name, attrs),
//TODO If FilterProvider gets changed later then MBean would need to be
// re-registered
registerMBean(whiteboard, FilterConfigMBean.class,
@@ -207,7 +287,7 @@
new Registration() {
@Override
public void unregister() {
- observer.close();
+ filteringObserver.close();
}
},
new Registration() {
@@ -225,8 +305,9 @@
);
}
- private BackgroundObserver createObserver(final WhiteboardExecutor executor) {
- return new BackgroundObserver(this, executor, queueLength) {
+ private FilteringObserver createObserver(final WhiteboardExecutor executor) {
+ FilteringDispatcher fd = new FilteringDispatcher(this);
+ BackgroundObserver bo = new BackgroundObserver(fd, executor, queueLength) {
private volatile long delay;
private volatile boolean blocking;
@@ -287,7 +368,43 @@
}
}
+
+ @Override
+ public String toString() {
+ return "Prefiltering BackgroundObserver for "+ChangeProcessor.this;
+ }
};
+ return new FilteringObserver(bo, new Filter() {
+
+ @Override
+ public boolean excludes(NodeState root, CommitInfo info) {
+ if (PREFILTERING_TESTMODE) {
+ // then we don't prefilter but only test later
+ prefilterSkipCount++;
+ return false;
+ }
+ final FilterResult filterResult = evalPrefilter(root, info, getChangeSet(info));
+ switch (filterResult) {
+ case PREFILTERING_SKIPPED: {
+ prefilterSkipCount++;
+ return false;
+ }
+ case EXCLUDE: {
+ prefilterExcludeCount++;
+ return true;
+ }
+ case INCLUDE: {
+ prefilterIncludeCount++;
+ return false;
+ }
+ default: {
+ LOG.info("isExcluded: unknown/unsupported filter result: " + filterResult);
+ prefilterSkipCount++;
+ return false;
+ }
+ }
+ }
+ });
}
private final Monitor runningMonitor = new Monitor();
@@ -339,16 +456,50 @@
}
}
+ /**
+ * Utility method that extracts the ChangeSet from a CommitInfo if possible.
+ * @param info
+ * @return
+ */
+ public static ChangeSet getChangeSet(CommitInfo info) {
+ if (info == null) {
+ return null;
+ }
+ CommitContext context = (CommitContext) info.getInfo().get(CommitContext.NAME);
+ if (context == null) {
+ return null;
+ }
+ return (ChangeSet) context.get(COMMIT_CONTEXT_OBSERVATION_CHANGESET);
+ }
+
@Override
- public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
- if (previousRoot != null) {
+ public void contentChanged(@Nonnull NodeState before,
+ @Nonnull NodeState after,
+ @Nullable CommitInfo info) {
+ FilterResult prefilterTestResult = null;
+ if (PREFILTERING_TESTMODE) {
+ // OAK-4908 test mode: when the ChangeCollectorProvider is enabled
+ // there is the option to have the ChangeProcessors run in so-called
+ // 'test mode'. In this test mode the prefiltering is not applied,
+ // but instead verified if it *would have prefiltered correctly*.
+ // that test is therefore done at dequeue-time, hence in
+ // contentChanged
+ // TODO: remove this testing mechanism after a while
try {
+ prefilterTestResult = evalPrefilter(after, info, getChangeSet(info));
+ } catch (Exception e) {
+ LOG.warn("contentChanged: exception in wouldBeExcludedCommit: " + e, e);
+ }
+ }
+ if (before != null) {
+ try {
long start = PERF_LOGGER.start();
FilterProvider provider = filterProvider.get();
+ boolean onEventInvoked = false;
// FIXME don't rely on toString for session id
if (provider.includeCommit(contentSession.toString(), info)) {
- EventFilter filter = provider.getFilter(previousRoot, root);
- EventIterator events = new EventQueue(namePathMapper, info, previousRoot, root,
+ EventFilter filter = provider.getFilter(before, after);
+ EventIterator events = new EventQueue(namePathMapper, info, before, after,
provider.getSubTrees(), Filters.all(filter, VISIBLE_FILTER),
provider.getEventAggregator());
@@ -361,6 +512,7 @@
}
try {
CountingIterator countingEvents = new CountingIterator(events);
+ onEventInvoked = true;
eventListener.onEvent(countingEvents);
countingEvents.updateCounters(eventCount, eventDuration);
} finally {
@@ -371,14 +523,33 @@
}
}
}
+ if (prefilterTestResult != null) {
+ // OAK-4908 test mode
+ if (prefilterTestResult == FilterResult.EXCLUDE && onEventInvoked) {
+ // this is not ok, an event would have gotten
+ // excluded-by-prefiltering even though
+ // it actually got an event.
+ LOG.warn("contentChanged: delivering event which would have been prefiltered, "
+ + "info={}, this={}, listener={}", info, this, eventListener);
+ } else if (prefilterTestResult == FilterResult.INCLUDE && !onEventInvoked && info != null
+ && info != CommitInfo.EMPTY) {
+ // this can occur arbitrarily frequent. as prefiltering
+ // is not perfect, it can
+ // have false negatives - ie it can include even though
+ // no event is then created
+ // hence we can only really log at debug here
+ LOG.debug(
+ "contentChanged: no event to deliver but not prefiltered, info={}, this={}, listener={}",
+ info, this, eventListener);
+ }
+ }
PERF_LOGGER.end(start, 100,
"Generated events (before: {}, after: {})",
- previousRoot, root);
+ before, after);
} catch (Exception e) {
LOG.warn("Error while dispatching observation events for " + tracker, e);
}
}
- previousRoot = root;
}
private static class CountingIterator implements EventIterator {
@@ -490,4 +661,45 @@
+ ", commitRateLimiter=" + commitRateLimiter
+ ", running=" + running.isSatisfied() + "]";
}
+
+ /**
+ * Evaluate the prefilter for a given commit.
+ * @param changeSet
+ *
+ * @return a FilterResult indicating either inclusion, exclusion or
+ * inclusion-due-to-skipping. The latter is used to reflect
+ * prefilter evaluation better in statistics (as it could also have
+ * been reported just as include)
+ */
+ private FilterResult evalPrefilter(NodeState root, CommitInfo info, ChangeSet changeSet) {
+ if (info == null) {
+ return FilterResult.PREFILTERING_SKIPPED;
+ }
+ if (root == null) {
+ // likely only occurs at startup
+ // we can't do any diffing etc, so just not exclude it
+ return FilterResult.PREFILTERING_SKIPPED;
+ }
+
+ final FilterProvider fp = filterProvider.get();
+ // FIXME don't rely on toString for session id
+ if (!fp.includeCommit(contentSession.toString(), info)) {
+ // 'classic' (and cheap pre-) filtering
+ return FilterResult.EXCLUDE;
+ }
+ if (changeSet == null) {
+ // then can't do any prefiltering since it was not
+ // able to complete the sets (within the given boundaries)
+ // (this corresponds to a large commit, which thus can't
+ // go through prefiltering)
+ return FilterResult.PREFILTERING_SKIPPED;
+ }
+
+ final ChangeSetFilter prefilter = fp;
+ if (prefilter.excludes(changeSet)) {
+ return FilterResult.EXCLUDE;
+ } else {
+ return FilterResult.INCLUDE;
+ }
+ }
}
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java (nonexistent)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java (working copy)
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.jcr.observation;
+
+public interface ChangeProcessorMBean {
+ String TYPE = "ChangeProcessorStats";
+
+ /** Returns the number of commits that were excluded by the prefiltering mechanism */
+ int getPrefilterExcludeCount();
+
+ /** Returns the number of commits that were included by the prefiltering mechanism */
+ int getPrefilterIncludeCount();
+
+ /** Returns the number of commits that skipped prefiltering, thus got included */
+ int getPrefilterSkipCount();
+}
Property changes on: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorMBean.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java (revision 1768549)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedListenerMBeanImpl.java (working copy)
@@ -84,6 +84,12 @@
referenceInterface = BackgroundObserverMBean.class,
policy = ReferencePolicy.DYNAMIC,
cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
+ @Reference(name = "changeProcessorMBean",
+ bind = "bindChangeProcessorMBean",
+ unbind = "unbindChangeProcessorMBean",
+ referenceInterface = ChangeProcessorMBean.class,
+ policy = ReferencePolicy.DYNAMIC,
+ cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE),
@Reference(name = "filterConfigMBean",
bind = "bindFilterConfigMBean",
unbind = "unbindFilterConfigMBean",
@@ -96,6 +102,7 @@
private final AtomicInteger observerCount = new AtomicInteger();
private final Map eventListeners = Maps.newConcurrentMap();
private final Map bgObservers = Maps.newConcurrentMap();
+ private final Map changeProcessors = Maps.newConcurrentMap();
private final Map filterConfigs = Maps.newConcurrentMap();
private Registration mbeanReg;
@@ -201,6 +208,11 @@
m.observerMBean = ef.getValue();
}
}
+ for (Map.Entry ef : changeProcessors.entrySet()){
+ if (Objects.equal(getListenerId(ef.getKey()), listenerId)){
+ m.changeProcessorMBean = ef.getValue();
+ }
+ }
mbeans.add(m);
}
return mbeans;
@@ -249,6 +261,16 @@
}
@SuppressWarnings("unused")
+ protected void bindChangeProcessorMBean(ChangeProcessorMBean mbean, Map config){
+ changeProcessors.put(getObjectName(config), mbean);
+ }
+
+ @SuppressWarnings("unused")
+ protected void unbindChangeProcessorMBean(ChangeProcessorMBean mbean, Map config){
+ changeProcessors.remove(getObjectName(config));
+ }
+
+ @SuppressWarnings("unused")
protected void bindListenerMBean(EventListenerMBean mbean, Map config){
eventListeners.put(getObjectName(config), mbean);
}
@@ -280,6 +302,7 @@
private static class ListenerMBeans {
EventListenerMBean eventListenerMBean;
BackgroundObserverMBean observerMBean;
+ ChangeProcessorMBean changeProcessorMBean;
FilterConfigMBean filterConfigMBean;
}
@@ -301,6 +324,9 @@
"ratioOfTimeSpentProcessingEvents",
"eventConsumerTimeRatio",
"queueBacklogMillis",
+ "prefilterSkips",
+ "prefilterExcludes",
+ "prefilterIncludes",
"queueSize",
"localEventCount",
"externalEventCount",
@@ -331,6 +357,9 @@
SimpleType.INTEGER,
SimpleType.INTEGER,
SimpleType.INTEGER,
+ SimpleType.INTEGER,
+ SimpleType.INTEGER,
+ SimpleType.INTEGER,
SimpleType.STRING,
SimpleType.BOOLEAN,
SimpleType.BOOLEAN,
@@ -376,6 +405,9 @@
mbeans.eventListenerMBean.getRatioOfTimeSpentProcessingEvents(),
mbeans.eventListenerMBean.getEventConsumerTimeRatio(),
mbeans.eventListenerMBean.getQueueBacklogMillis(),
+ mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterSkipCount(),
+ mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterExcludeCount(),
+ mbeans.changeProcessorMBean == null ? -1 : mbeans.changeProcessorMBean.getPrefilterIncludeCount(),
mbeans.observerMBean.getQueueSize(),
mbeans.observerMBean.getLocalEventCount(),
mbeans.observerMBean.getExternalEventCount(),
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java (revision 1768549)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilterImpl.java (working copy)
@@ -512,4 +512,21 @@
return this;
}
+ /**
+ * A hook called by the ObservationManagerImpl before creating the ChangeSetFilterImpl
+ * which allows this filter to adjust the includePaths according to its
+ * enabled flags.
+ *
+ * This is used to set the includePath to be '/' in case includeAncestorRemove
+ * is set. The reason for this is that we must catch parent removals and can thus
+ * not apply the normally applied prefilter paths.
+ * @param includePaths the set to adjust depending on filter flags
+ */
+ void adjustPrefilterIncludePaths(Set includePaths) {
+ if (includeAncestorRemove) {
+ includePaths.clear();
+ includePaths.add("/");
+ }
+ }
+
}
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (revision 1768549)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (working copy)
@@ -62,6 +62,7 @@
import org.apache.jackrabbit.oak.plugins.observation.filter.UniversalFilter.Selector;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
import org.apache.jackrabbit.oak.plugins.observation.filter.PermissionProviderFactory;
+import org.apache.jackrabbit.oak.plugins.observation.filter.ChangeSetFilterImpl;
import org.apache.jackrabbit.oak.plugins.observation.filter.Selectors;
import org.apache.jackrabbit.oak.spi.commit.Observable;
import org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration;
@@ -278,6 +279,7 @@
List excludeConditions = createExclusions(filterBuilder, excludedPaths);
+ final String[] validatedNodeTypeNames = validateNodeTypeNames(nodeTypeName);
Selector nodeTypeSelector = Selectors.PARENT;
boolean deleteSubtree = true;
if (oakEventFilter != null) {
@@ -304,7 +306,7 @@
filterBuilder.moveSubtree(),
filterBuilder.eventType(eventTypes),
filterBuilder.uuid(Selectors.PARENT, uuids),
- filterBuilder.nodeType(nodeTypeSelector, validateNodeTypeNames(nodeTypeName)),
+ filterBuilder.nodeType(nodeTypeSelector, validatedNodeTypeNames),
filterBuilder.accessControl(permissionProviderFactory));
if (oakEventFilter != null) {
condition = oakEventFilter.wrapMainCondition(condition, filterBuilder, permissionProviderFactory);
@@ -319,6 +321,16 @@
ListenerTracker tracker = new WarningListenerTracker(
!noExternal, listener, eventTypes, absPath, isDeep, uuids, nodeTypeName, noLocal);
+ if (oakEventFilter != null) {
+ oakEventFilter.adjustPrefilterIncludePaths(includePaths);
+ }
+
+ // OAK-4908 : prefiltering support. here we have explicit yes/no/maybe filtering
+ // for things like propertyNames/nodeTypes/nodeNames/paths which cannot be
+ // applied on the full-fledged filterBuilder above but requires an explicit 'prefilter' for that.
+ filterBuilder.setChangeSetFilter(new ChangeSetFilterImpl(includePaths, isDeep, excludedPaths, null,
+ validatedNodeTypeNames == null ? null : newHashSet(validatedNodeTypeNames), null));
+
addEventListener(listener, tracker, filterBuilder.build());
}
Index: oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
===================================================================
--- oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java (revision 1768549)
+++ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java (working copy)
@@ -436,7 +436,25 @@
observationManager.removeEventListener(listener);
}
}
+
+ @Test
+ public void propertyFilter() throws Exception {
+ Node root = getNode("/");
+ ExpectationListener listener = new ExpectationListener();
+ observationManager.addEventListener(listener, PROPERTY_ADDED, "/a/b", false, null, null, false);
+ Node a = root.addNode("a");
+ Node b = a.addNode("b");
+ listener.expect("/a/b/jcr:primaryType", PROPERTY_ADDED);
+ listener.expectAdd(b.setProperty("propName", 1));
+ root.getSession().save();
+
+ List missing = listener.getMissing(TIME_OUT, TimeUnit.SECONDS);
+ assertTrue("Missing events: " + missing, missing.isEmpty());
+ List unexpected = listener.getUnexpected();
+ assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+ }
+
@Test
public void pathFilter() throws Exception {
final String path = "/events/only/here";
@@ -1515,7 +1533,7 @@
filter = new JackrabbitEventFilter();
filter.setEventTypes(ALL_EVENTS);
- filter = FilterFactory.wrap(filter).withIncludeGlobPaths(TEST_PATH + "/a3/**/y/*");
+ filter = FilterFactory.wrap(filter).withIncludeGlobPaths(TEST_PATH + "/a3/**/y");
oManager.addEventListener(listener, filter);
cp = oManager.getChangeProcessor(listener);
assertNotNull(cp);