Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilter.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilter.java	(revision 0)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilter.java	(working copy)
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.jcr.observation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.api.observation.JackrabbitEventFilter;
+import org.apache.jackrabbit.oak.plugins.observation.filter.EventAggregator;
+import org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder;
+import org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder.Condition;
+
+/**
+ * Extension to JackrabbitEventFilter which encapsulates features supported only
+ * by Oak (and not Jackrabbit).
+ */
+public class OakEventFilter extends JackrabbitEventFilter {
+
+    private FilterBuilder builder;
+
+    private Condition all;
+
+    private Condition any;
+
+    private EventAggregator aggregator;
+
+    public OakEventFilter(@Nonnull JackrabbitEventFilter blueprint) {
+        checkNotNull(blueprint);
+        copy(blueprint);
+    }
+
+    private void copy(JackrabbitEventFilter blueprint) {
+        setAbsPath(blueprint.getAbsPath());
+        setAdditionalPaths(blueprint.getAdditionalPaths());
+        setEventTypes(blueprint.getEventTypes());
+        setExcludedPaths(blueprint.getExcludedPaths());
+        String[] identifiers = blueprint.getIdentifiers();
+        if (identifiers != null) {
+            setIdentifiers(identifiers);
+        }
+        setIsDeep(blueprint.getIsDeep());
+        String[] nodeTypes = blueprint.getNodeTypes();
+        if (nodeTypes != null) {
+            setNodeTypes(nodeTypes);
+        }
+        setNoExternal(blueprint.getNoExternal());
+        setNoInternal(blueprint.getNoInternal());
+        setNoLocal(blueprint.getNoLocal());
+    }
+
+    public FilterBuilder builder() {
+        if (builder == null) {
+            builder = new FilterBuilder();
+        }
+        return builder;
+    }
+
+    public OakEventFilter and(Condition... condition) {
+        checkNotNull(condition);
+        if (all == null) {
+            all = builder().all(condition);
+        } else {
+            all = builder().all(all, builder.all(condition));
+        }
+        return this;
+    }
+
+    public OakEventFilter or(Condition... condition) {
+        checkNotNull(condition);
+        if (any == null) {
+            any = builder().any(condition);
+        } else {
+            any = builder().any(any, builder.any(condition));
+        }
+        return this;
+    }
+    
+    public OakEventFilter aggregator(EventAggregator aggregator) {
+        checkNotNull(aggregator);
+        this.aggregator = aggregator;
+        return this;
+    }
+
+    public Condition getAnds() {
+        return all;
+    }
+
+    public Condition getOrs() {
+        return any;
+    }
+
+    public EventAggregator getAggregator() {
+        return aggregator;
+    }
+}

Property changes on: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/OakEventFilter.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java	(revision 1766682)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java	(working copy)
@@ -238,6 +238,19 @@
 
         List<Condition> excludeConditions = createExclusions(filterBuilder, excludedPaths);
 
+        if (filter instanceof OakEventFilter) {
+            OakEventFilter oakEventFilter = (OakEventFilter) filter;
+            Condition ands = oakEventFilter.getAnds();
+            if (ands != null) {
+                excludeConditions.add(ands);
+            }
+            Condition ors = oakEventFilter.getOrs();
+            if (ors != null) {
+                includeConditions.add(ors);
+            }
+            filterBuilder.aggregator(oakEventFilter.getAggregator());
+        }
+        
         filterBuilder
             .includeSessionLocal(!noLocal)
             .includeClusterExternal(!noExternal)
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/util/EventFilterBuilder.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/util/EventFilterBuilder.java	(revision 0)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/util/EventFilterBuilder.java	(working copy)
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.jcr.observation.util;
+
+import static com.google.common.collect.Lists.newArrayList;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+import javax.jcr.RepositoryException;
+
+import org.apache.jackrabbit.api.observation.JackrabbitEventFilter;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.jcr.observation.OakEventFilter;
+import org.apache.jackrabbit.oak.plugins.nodetype.TypePredicate;
+import org.apache.jackrabbit.oak.plugins.observation.filter.ConstantFilter;
+import org.apache.jackrabbit.oak.plugins.observation.filter.EventAggregator;
+import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
+import org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder.Condition;
+import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
+import org.apache.jackrabbit.oak.plugins.observation.filter.GlobbingPathFilter;
+import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+public class EventFilterBuilder {
+
+    /**
+     * 
+     * @param baseFilter the base filter potentially containing other settings
+     * @param globPath glob path that should be added as include path pattern. Note that 
+     * the NamePathMapper is not applied on this globPath.
+     * @return a new filter based on the provided one plus the new glob path as include path
+     */
+    public static JackrabbitEventFilter orGlobPath(JackrabbitEventFilter baseFilter, String globPath) {
+        OakEventFilter originalFilter = oakEventFilter(baseFilter);
+        Condition globPathCondition = originalFilter.builder().path(globPath);
+        return originalFilter.or(globPathCondition);
+    }
+    
+//    private static String wildcardAsRegex(String patternWithWildcards) {
+//        if (patternWithWildcards==null) {
+//            throw new IllegalArgumentException("patternWithWildcards must not be null");
+//        }
+//        return "\\Q"+patternWithWildcards.replace("?", "\\E.\\Q").replace("*", "\\E.*\\Q")+"\\E";
+//    }
+
+    private static String globAsRegex(String patternWithGlobs) {
+        if (patternWithGlobs == null) {
+            return null;
+        }
+        String[] starStarParts = patternWithGlobs.split("\\*\\*", -1);
+        StringBuffer sb = new StringBuffer();
+        sb.append("\\Q");
+        for(int i=0; i<starStarParts.length; i++) {
+            if (i > 0) {
+                // the '**' regexp equivalent
+                sb.append("\\E.*\\Q");
+            }
+            sb.append(starStarParts[i].replace("*", "\\E[^/]*\\Q"));
+        }
+        sb.append("\\E");
+        return sb.toString();
+//        return "\\Q"+patternWithGlobs.replace("**", "\\E.*\\Q").replace("*", "\\E[^/]*\\Q") + "\\E";
+    }
+    
+    static class AggregateFilter implements EventFilter, Condition {
+        
+        private final String[] nodeTypes;
+        private final String[] relativeGlobPaths;
+        private final boolean includeThis;
+        private TypePredicate predicate;
+        
+        AggregateFilter(String[] nodeTypes, String[] relativeGlobPaths) {
+            this.nodeTypes = nodeTypes;
+            this.relativeGlobPaths = relativeGlobPaths;
+            boolean includeThis = false;
+            for (String aRelativeGlobPath : relativeGlobPaths) {
+                if (aRelativeGlobPath.equals("") || aRelativeGlobPath.equals(".") || aRelativeGlobPath.equals("*") || aRelativeGlobPath.equals("**")) {
+                    includeThis = true;
+                }
+            }
+            this.includeThis = includeThis;
+        }
+        
+        @Override
+        public boolean includeAdd(PropertyState after) {
+            // the AggregateFilter 'waits' for the first hit based on nodeTypes
+            // at which point it switches to a GlobbingPathFilter - so property
+            // changes will be handled in the GlobbingPathFilter, never here.
+            return false;
+        }
+        
+        @Override
+        public boolean includeChange(PropertyState before, PropertyState after) {
+            // the AggregateFilter 'waits' for the first hit based on nodeTypes
+            // at which point it switches to a GlobbingPathFilter - so property
+            // changes will be handled in the GlobbingPathFilter, never here.
+            return false;
+        }
+        
+        @Override
+        public boolean includeDelete(PropertyState before) {
+            // the AggregateFilter 'waits' for the first hit based on nodeTypes
+            // at which point it switches to a GlobbingPathFilter - so property
+            // changes will be handled in the GlobbingPathFilter, never here.
+            return false;
+        }
+        
+        @Override
+        public boolean includeAdd(String name, NodeState after) {
+            return includeThis && predicate.apply(after);
+        }
+        
+        @Override
+        public boolean includeDelete(String name, NodeState before) {
+            return includeThis && predicate.apply(before);
+        }
+        
+        @Override
+        public boolean includeMove(String sourcePath, String name, NodeState moved) {
+            return includeThis && predicate.apply(moved);
+        }
+        
+        @Override
+        public boolean includeReorder(String destName, String name, NodeState reordered) {
+            return includeThis && predicate.apply(reordered);
+        }
+
+        @Override
+        public EventFilter create(String name, NodeState before, NodeState after) {
+            boolean predicateMatches = false;
+            if (after.exists()) {
+                predicateMatches = predicate.apply(after);
+            } else {
+                predicateMatches = predicate.apply(before);
+            }
+            if (predicateMatches) {
+                // greedy match - we switch to the globbing path filters
+                List<EventFilter> filters = newArrayList();
+                for (String relativeGlobPath : relativeGlobPaths) {
+                    if (relativeGlobPath.endsWith("*")) {
+                        filters.add(new GlobbingPathFilter(relativeGlobPath));
+                    } else {
+                        filters.add(new GlobbingPathFilter(relativeGlobPath + "/*"));
+                    }
+                }
+                return filters.isEmpty()
+                    ? ConstantFilter.EXCLUDE_ALL
+                    : Filters.any(filters);
+            } else {
+                // non-match - we stay with this filter
+                return this;
+            }
+        }
+
+        @Override
+        public EventFilter createFilter(NodeState before, NodeState after) {
+            if (after.exists()) {
+                predicate = new TypePredicate(after, nodeTypes);
+            } else {
+                predicate = new TypePredicate(before, nodeTypes);
+            }
+            return this;
+        }
+        
+    }
+    
+    /**
+     * Greedy aggregating filter which upon first (hence greedy) hit of provided nodeTypes
+     * checks if the child subtree leading to the actual change matches any
+     * of the provided relativeGlobPaths.
+     * @param baseFilter
+     * @param nodeTypes note that these nodeTypes are not mapped to oak nor validated
+     * @param relativeGlobPath note that this relative path is not mapped to oak, can contain globs.
+     * @return
+     * @throws RepositoryException
+     */
+    public static JackrabbitEventFilter andAggregate(JackrabbitEventFilter baseFilter, final String[] nodeTypes, final String... relativeGlobPaths)
+            throws RepositoryException {
+        OakEventFilter oakEventFilter = oakEventFilter(baseFilter);
+        Condition aggregateCondition = new AggregateFilter(nodeTypes, relativeGlobPaths);
+
+        final Pattern[] relativePathPatterns = new Pattern[relativeGlobPaths.length];
+        for(int i=0; i<relativePathPatterns.length; i++) {
+            relativePathPatterns[i] = Pattern.compile(globAsRegex(relativeGlobPaths[i]));
+        }
+        oakEventFilter.aggregator(new EventAggregator() {
+
+            @Override
+            public int aggregate(NodeState root, List<ChildNodeEntry> parents, ChildNodeEntry childNodeState) {
+                final TypePredicate nodeTypePredicate = new TypePredicate(root, nodeTypes);
+                final int depth = parents.size();
+                for(int i=0; i<depth; i++) {
+                    ChildNodeEntry child = parents.get(i);
+                    NodeState nodeState = child.getNodeState();
+                    if (!nodeTypePredicate.apply(nodeState)) {
+                        continue;
+                    }
+                    if (i + 1 <= depth) {
+                        String childPath = asPath(parents.subList(i + 1, depth));
+                        for (Pattern pattern : relativePathPatterns) {
+                            if (pattern.matcher(childPath).matches()) {
+                                return depth - i;
+                            }
+                        }
+                    }
+                }
+                return 0;
+            }
+            
+            @Override
+            public int aggregate(NodeState root, List<ChildNodeEntry> parents, PropertyState propertyState) {
+                final TypePredicate nodeTypePredicate = new TypePredicate(root, nodeTypes);
+                final int depth = parents.size();
+                for(int i=0; i<depth; i++) {
+                    ChildNodeEntry child = parents.get(i);
+                    NodeState nodeState = child.getNodeState();
+                    if (!nodeTypePredicate.apply(nodeState)) {
+                        continue;
+                    }
+                    if (i + 1 <= depth) {
+                        String childPath = asPath(parents.subList(i + 1, depth));
+                        for (Pattern pattern : relativePathPatterns) {
+                            if (pattern.matcher(childPath).matches()) {
+                                return depth - (i + 1);
+                            }
+                        }
+                    }
+                }
+                return 0;
+            }
+
+            private String asPath(List<ChildNodeEntry> children) {
+                if (children.isEmpty()) {
+                    return "";
+                }
+                StringBuilder sb = new StringBuilder();
+                for (ChildNodeEntry child : children) {
+                    if (sb.length() != 0) {
+                        sb.append("/");
+                    }
+                    sb.append(child.getName());
+                }
+                return sb.toString();
+            }
+            
+        });
+
+        return oakEventFilter.and(aggregateCondition);
+    }
+
+    private static OakEventFilter oakEventFilter(JackrabbitEventFilter filter) {
+        if (filter instanceof OakEventFilter) {
+            return (OakEventFilter) filter;
+        } else {
+            return new OakEventFilter(filter);
+        }
+    }
+
+}

Property changes on: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/util/EventFilterBuilder.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java
===================================================================
--- oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java	(revision 1766682)
+++ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationTest.java	(working copy)
@@ -39,6 +39,7 @@
 import static org.junit.Assume.assumeTrue;
 
 import java.util.Arrays;
+import java.util.Calendar;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -78,6 +79,7 @@
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
 import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
+import org.apache.jackrabbit.oak.jcr.observation.util.EventFilterBuilder;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterBuilder;
 import org.apache.jackrabbit.oak.plugins.observation.filter.Selectors;
 import org.junit.After;
@@ -95,7 +97,7 @@
     private static final String REFERENCEABLE_NODE = "\"referenceable\"";
     private static final String TEST_PATH = '/' + TEST_NODE;
     private static final String TEST_TYPE = "mix:test";
-    public static final int TIME_OUT = 60;
+    public static final int TIME_OUT = 4;
 
     private Session observingSession;
     private ObservationManager observationManager;
@@ -1039,6 +1041,91 @@
 
     //------------------------------------------------------------< ExpectationListener >---
 
+    @Test
+    public void testAggregate1() throws Exception {
+        assumeTrue(observationManager instanceof ObservationManagerImpl);
+        ObservationManagerImpl oManager = (ObservationManagerImpl) observationManager;
+        ExpectationListener listener = new ExpectationListener();
+        JackrabbitEventFilter filter = new JackrabbitEventFilter();
+        filter.setAbsPath("/parent");
+        filter.setIsDeep(true);
+        filter.setEventTypes(ALL_EVENTS);
+        filter = EventFilterBuilder.andAggregate(filter, new String[] {"oak:Unstructured"}, "", "jcr:content", "jcr:content/**");
+        oManager.addEventListener(listener, filter);
+        Node parent = getAdminSession().getRootNode().addNode("parent", "nt:unstructured");
+        Node child = parent.addNode("child", "nt:unstructured");
+        Node file = child.addNode("file", "oak:Unstructured");
+        listener.expectAdd(file);
+        Node jcrContent = file.addNode("jcr:content", "nt:unstructured");
+        listener.expect(jcrContent.getPath(), "/parent/child/file", NODE_ADDED);
+        listener.expect(jcrContent.getPath() + "/jcr:primaryType", "/parent/child/file", PROPERTY_ADDED);
+        Property jcrDataProperty = jcrContent.setProperty("jcr:data", "foo");
+        listener.expect(jcrDataProperty.getPath(), "/parent/child/file", PROPERTY_ADDED);
+        parent.getSession().save();
+    
+        List<Expectation> missing = listener.getMissing(TIME_OUT, TimeUnit.SECONDS);
+        List<Event> unexpected = listener.getUnexpected();
+        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+        assertTrue("Missing events: " + missing, missing.isEmpty());
+        
+        file = getAdminSession().getRootNode().getNode("parent").getNode("child").getNode("file");
+        jcrContent = file.getNode("jcr:content");
+        Property newProperty = jcrContent.setProperty("newProperty", "foo");
+        listener.expect(newProperty.getPath(), "/parent/child/file", PROPERTY_ADDED);
+        Property lastModifiedBy = jcrContent.setProperty("jcr:lastModifiedBy", "bar");
+        listener.expect(lastModifiedBy.getPath(), "/parent/child/file", PROPERTY_ADDED);
+        jcrContent.getSession().save();
+        
+        Thread.sleep(2000);
+        missing = listener.getMissing(TIME_OUT, TimeUnit.SECONDS);
+        unexpected = listener.getUnexpected();
+        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+        assertTrue("Missing events: " + missing, missing.isEmpty());
+    }
+
+    @Test
+    public void testAggregate2() throws Exception {
+        assumeTrue(observationManager instanceof ObservationManagerImpl);
+        ObservationManagerImpl oManager = (ObservationManagerImpl) observationManager;
+        ExpectationListener listener = new ExpectationListener();
+        JackrabbitEventFilter filter = new JackrabbitEventFilter();
+        filter.setAbsPath("/parent");
+        filter.setIsDeep(true);
+        filter.setEventTypes(ALL_EVENTS);
+        filter = EventFilterBuilder.andAggregate(filter, new String[] {"oak:Unstructured"}, "", "**");//file", "file/jcr:content", "file/jcr:content/**");
+        oManager.addEventListener(listener, filter);
+        Node parent = getAdminSession().getRootNode().addNode("parent", "nt:unstructured");
+        Node child = parent.addNode("child", "oak:Unstructured");
+        listener.expectAdd(child);
+        Node file = child.addNode("file", "nt:unstructured");
+        listener.expectAdd(file);
+        Node jcrContent = file.addNode("jcr:content", "nt:unstructured");
+        listener.expect(jcrContent.getPath(), "/parent/child", NODE_ADDED);
+        listener.expect(jcrContent.getPath() + "/jcr:primaryType", "/parent/child", PROPERTY_ADDED);
+        Property jcrDataProperty = jcrContent.setProperty("jcr:data", "foo");
+        listener.expect(jcrDataProperty.getPath(), "/parent/child", PROPERTY_ADDED);
+        parent.getSession().save();
+    
+        List<Expectation> missing = listener.getMissing(TIME_OUT, TimeUnit.SECONDS);
+        List<Event> unexpected = listener.getUnexpected();
+        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+        assertTrue("Missing events: " + missing, missing.isEmpty());
+        
+        file = getAdminSession().getRootNode().getNode("parent").getNode("child").getNode("file");
+        jcrContent = file.getNode("jcr:content");
+        Property newProperty = jcrContent.setProperty("newProperty", "foo");
+        listener.expect(newProperty.getPath(), "/parent/child", PROPERTY_ADDED);
+        Property lastModifiedBy = jcrContent.setProperty("jcr:lastModifiedBy", "bar");
+        listener.expect(lastModifiedBy.getPath(), "/parent/child", PROPERTY_ADDED);
+        jcrContent.getSession().save();
+        
+        Thread.sleep(2000);
+        missing = listener.getMissing(TIME_OUT, TimeUnit.SECONDS);
+        unexpected = listener.getUnexpected();
+        assertTrue("Unexpected events: " + unexpected, unexpected.isEmpty());
+        assertTrue("Missing events: " + missing, missing.isEmpty());
+    }
+
     private static class Expectation extends ForwardingListenableFuture<Event> {
         private final SettableFuture<Event> future = SettableFuture.create();
         private final String name;
@@ -1124,6 +1211,15 @@
             });
         }
 
+        public Future<Event> expect(final String path, final String identifier, final int type) {
+            return expect(new Expectation("path = " + path + ", identifier = " + identifier + ", type = " + type) {
+                @Override
+                public boolean onEvent(Event event) throws RepositoryException {
+                    return type == event.getType() && equal(path, event.getPath()) && equal(identifier, event.getIdentifier());
+                }
+            });
+        }
+
         public Node expectAdd(Node node) throws RepositoryException {
             expect(node.getPath(), NODE_ADDED);
             expect(node.getPath() + "/jcr:primaryType", PROPERTY_ADDED);
