Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java	(revision 1465953)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java	(working copy)
@@ -16,7 +16,6 @@
  */
 package org.apache.jackrabbit.oak.jcr;
 
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import javax.annotation.Nonnull;
@@ -52,10 +51,6 @@
 
     private final Oak oak;
 
-    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(0);
-
-    private SecurityProvider securityProvider;
-
     public Jcr(Oak oak) {
         this.oak = oak;
 
@@ -126,7 +121,6 @@
     @Nonnull
     public final Jcr with(@Nonnull SecurityProvider securityProvider) {
         oak.with(checkNotNull(securityProvider));
-        this.securityProvider = securityProvider;
         return this;
     }
 
@@ -138,13 +132,15 @@
 
     @Nonnull
     public final Jcr with(@Nonnull ScheduledExecutorService executor) {
-        this.executor = checkNotNull(executor);
+        oak.with(checkNotNull(executor));
         return this;
     }
 
     public Repository createRepository() {
         return new RepositoryImpl(
-                oak.createContentRepository(), executor, securityProvider);
+                oak.createContentRepository(), 
+                oak.getExecutorService(), 
+                oak.getSecurityProvider());
     }
 
 }
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentRepositoryImpl.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentRepositoryImpl.java	(revision 1465953)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/core/ContentRepositoryImpl.java	(working copy)
@@ -25,6 +25,7 @@
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.api.ContentSession;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
+import org.apache.jackrabbit.oak.spi.observation.ObservationSchedulerImpl;
 import org.apache.jackrabbit.oak.spi.query.CompositeQueryIndexProvider;
 import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
@@ -45,6 +46,7 @@
     private final String defaultWorkspaceName;
     private final SecurityProvider securityProvider;
     private final QueryIndexProvider indexProvider;
+    private final ObservationSchedulerImpl observation;
 
     /**
      * Creates an content repository instance based on the given, already
@@ -60,12 +62,14 @@
                                  @Nonnull CommitHook commitHook,
                                  @Nonnull String defaultWorkspaceName,
                                  @Nullable QueryIndexProvider indexProvider,
-                                 @Nonnull SecurityProvider securityProvider) {
+                                 @Nonnull SecurityProvider securityProvider,
+                                 @Nonnull ObservationSchedulerImpl observation) {
         this.nodeStore = checkNotNull(nodeStore);
         this.commitHook = checkNotNull(commitHook);
         this.defaultWorkspaceName = checkNotNull(defaultWorkspaceName);
         this.securityProvider = checkNotNull(securityProvider);
         this.indexProvider = indexProvider != null ? indexProvider : new CompositeQueryIndexProvider();
+        this.observation = observation;
     }
 
     @Nonnull
@@ -93,4 +97,8 @@
         return nodeStore;
     }
 
+    public ObservationSchedulerImpl getObservationScheduler() {
+        return observation;
+    }
+
 }
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java	(revision 1465953)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java	(working copy)
@@ -18,6 +18,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
 
 import javax.annotation.Nonnull;
 import javax.jcr.NoSuchWorkspaceException;
@@ -47,6 +48,7 @@
 import org.apache.jackrabbit.oak.spi.lifecycle.OakInitializer;
 import org.apache.jackrabbit.oak.spi.lifecycle.RepositoryInitializer;
 import org.apache.jackrabbit.oak.spi.lifecycle.WorkspaceInitializer;
+import org.apache.jackrabbit.oak.spi.observation.ObservationSchedulerImpl;
 import org.apache.jackrabbit.oak.spi.query.CompositeQueryIndexProvider;
 import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
 import org.apache.jackrabbit.oak.spi.security.SecurityConfiguration;
@@ -57,6 +59,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Lists.newArrayList;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
 
 /**
  * Builder class for constructing {@link ContentRepository} instances with
@@ -87,6 +90,8 @@
 
     private SecurityProvider securityProvider;
 
+    private ScheduledExecutorService executor = newScheduledThreadPool(0);
+
     private String defaultWorkspaceName = DEFAULT_WORKSPACE_NAME;
 
     public Oak(NodeStore store) {
@@ -213,6 +218,11 @@
         return this;
     }
 
+    @Nonnull
+    public SecurityProvider getSecurityProvider() {
+        return this.securityProvider;
+    }
+
     /**
      * Associates the given conflict handler with the repository to be created.
      *
@@ -226,6 +236,17 @@
         return this;
     }
 
+    @Nonnull
+    public Oak with(@Nonnull ScheduledExecutorService executorService) {
+        this.executor = executorService;
+        return this;
+    }
+
+    @Nonnull
+    public ScheduledExecutorService getExecutorService() {
+        return this.executor;
+    }
+
     public ContentRepository createContentRepository() {
         IndexHookProvider indexHooks = CompositeIndexHookProvider.compose(indexHookProviders);
         OakInitializer.initialize(store, new CompositeInitializer(initializers), indexHooks);
@@ -250,8 +271,10 @@
                 defaultWorkspaceName, indexHooks, indexProvider,
                 CompositeHook.compose(initHooks));
 
+        ObservationSchedulerImpl observationScheduler = new ObservationSchedulerImpl(executor, store);
+
         // add index hooks later to prevent the OakInitializer to do excessive indexing
-        with(IndexHookManager.of(indexHooks));
+        with(IndexHookManager.of(indexHooks).setObservation(observationScheduler));
         withEditorHook();
         CommitHook commitHook = CompositeHook.compose(commitHooks);
         return new ContentRepositoryImpl(
@@ -259,7 +282,8 @@
                 commitHook,
                 defaultWorkspaceName,
                 indexProvider,
-                securityProvider);
+                securityProvider,
+                observationScheduler);
     }
 
     /**
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/observation/ObservationScheduler.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/observation/ObservationScheduler.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/observation/ObservationScheduler.java	(revision 0)
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.spi.observation;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+
+public interface ObservationScheduler {
+
+    void addListener(EditorProvider provider);
+
+    void addListener(EditorProvider provider, ExecutorService executor);
+
+    public void removeListener(EditorProvider provider);
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/observation/ObservationScheduler.java
___________________________________________________________________
Added: svn:keywords
   + Author Date Id Revision Rev URL
Added: svn:eol-style
   + native

Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/observation/ObservationSchedulerImpl.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/observation/ObservationSchedulerImpl.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/observation/ObservationSchedulerImpl.java	(revision 0)
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.spi.observation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+
+public class ObservationSchedulerImpl implements ObservationScheduler {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(ObservationSchedulerImpl.class);
+
+    private final ScheduledExecutorService executor;
+    private final Map<EditorProvider, ExecutorService> providers = new ConcurrentHashMap<EditorProvider, ExecutorService>();
+
+    public ObservationSchedulerImpl(@Nonnull ScheduledExecutorService executor,
+            @Nonnull NodeStore store) {
+        this.executor = checkNotNull(executor);
+        this.executor.scheduleWithFixedDelay(new ObservationThread(
+                checkNotNull(store), this), 100, 1000, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void addListener(EditorProvider provider, ExecutorService executor) {
+        providers.put(provider, executor);
+    }
+
+    @Override
+    public void addListener(EditorProvider provider) {
+        addListener(provider, executor);
+    }
+
+    @Override
+    public void removeListener(EditorProvider provider) {
+        providers.remove(provider);
+    }
+
+    private static class ObservationThread implements Runnable {
+
+        private final NodeStore store;
+        private final ObservationSchedulerImpl scheduler;
+        private NodeState baseLine;
+
+        public ObservationThread(NodeStore store,
+                ObservationSchedulerImpl scheduler) {
+            this.store = store;
+            this.scheduler = scheduler;
+            this.baseLine = store.getRoot();
+        }
+
+        @Override
+        public void run() {
+            Map<EditorProvider, ExecutorService> providers = ImmutableMap
+                    .copyOf(scheduler.providers);
+            final NodeState head = store.getRoot();
+
+            List<Future<NodeState>> futures = new ArrayList<Future<NodeState>>();
+            for (Entry<EditorProvider, ExecutorService> entry : providers
+                    .entrySet()) {
+                Future<NodeState> f = entry.getValue().submit(
+                        new ObservationDiff(baseLine, head, entry.getKey()));
+                futures.add(f);
+            }
+
+            for (Future<NodeState> f : futures) {
+                try {
+                    // TODO what to do with the resulting nodestate?
+                    f.get();
+                } catch (InterruptedException e) {
+                    log.error(e.getMessage(), e);
+                } catch (ExecutionException e) {
+                    log.error(e.getMessage(), e);
+                }
+            }
+
+            baseLine = head;
+        }
+    }
+
+    private static class ObservationDiff implements Callable<NodeState> {
+
+        private final NodeState before;
+        private final NodeState after;
+        private final EditorProvider provider;
+
+        ObservationDiff(NodeState before, NodeState after,
+                EditorProvider provider) {
+            this.before = before;
+            this.after = after;
+            this.provider = provider;
+        }
+
+        @Override
+        public NodeState call() throws CommitFailedException {
+            EditorHook hook = new EditorHook(provider);
+            return hook.processCommit(before, after);
+        }
+
+    }
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/observation/ObservationSchedulerImpl.java
___________________________________________________________________
Added: svn:keywords
   + Author Date Id Revision Rev URL
Added: svn:eol-style
   + native

Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java	(revision 1465953)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUtils.java	(working copy)
@@ -152,4 +152,15 @@
         return new IndexDefinitionImpl(name, type, concat(path, name));
     }
 
+    public static boolean isIndexNodeType(NodeState state) {
+        PropertyState ps = state.getProperty(JCR_PRIMARYTYPE);
+        return ps != null && !ps.isArray()
+                && ps.getValue(Type.STRING).equals(INDEX_DEFINITIONS_NODE_TYPE);
+    }
+
+    public static boolean getBoolean(NodeState state, String property) {
+        PropertyState ps = state.getProperty(property);
+        return ps == null || (ps != null && ps.getValue(Type.BOOLEAN));
+    }
+
 }
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexHookManagerDiff.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexHookManagerDiff.java	(revision 1465953)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexHookManagerDiff.java	(working copy)
@@ -16,12 +16,12 @@
  */
 package org.apache.jackrabbit.oak.plugins.index;
 
-import static org.apache.jackrabbit.JcrConstants.JCR_PRIMARYTYPE;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
-import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NODE_TYPE;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
-import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_UNKNOWN;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.getBoolean;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.isIndexNodeType;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
 import java.util.HashSet;
@@ -37,6 +37,7 @@
 import org.apache.jackrabbit.oak.spi.commit.EditorHook;
 import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
 import org.apache.jackrabbit.oak.spi.commit.VisibleEditor;
+import org.apache.jackrabbit.oak.spi.observation.ObservationScheduler;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 
@@ -52,13 +53,17 @@
 
     private final IndexHookProvider provider;
 
+    private final ObservationScheduler observation;
+
     private final NodeBuilder node;
 
     private Editor inner = new DefaultEditor();
 
-    public IndexHookManagerDiff(IndexHookProvider provider, NodeBuilder node) {
+    public IndexHookManagerDiff(IndexHookProvider provider, NodeBuilder node,
+            ObservationScheduler observation) {
         this.provider = provider;
         this.node = node;
+        this.observation = observation;
     }
 
     @Override
@@ -66,42 +71,51 @@
             throws CommitFailedException {
         NodeState ref = node.getNodeState();
         if (ref.hasChildNode(INDEX_DEFINITIONS_NAME)) {
-            Set<String> existingTypes = new HashSet<String>();
+            Set<String> allTypes = new HashSet<String>();
+            Set<String> asyncTypes = new HashSet<String>();
             Set<String> reindexTypes = new HashSet<String>();
+
             NodeState index = ref.getChildNode(INDEX_DEFINITIONS_NAME);
             for (String indexName : index.getChildNodeNames()) {
                 NodeState indexChild = index.getChildNode(indexName);
-                if (isIndexNodeType(indexChild.getProperty(JCR_PRIMARYTYPE))) {
-                    PropertyState reindexPS = indexChild
-                            .getProperty(REINDEX_PROPERTY_NAME);
-                    boolean reindex = reindexPS == null
-                            || (reindexPS != null && indexChild.getProperty(
-                                    REINDEX_PROPERTY_NAME).getValue(
-                                    Type.BOOLEAN));
-                    String type = TYPE_UNKNOWN;
+                if (isIndexNodeType(indexChild)) {
+                    boolean reindex = getBoolean(indexChild,
+                            REINDEX_PROPERTY_NAME);
+                    boolean async = getBoolean(indexChild, ASYNC_PROPERTY_NAME);
+
+                    String type = null;
                     PropertyState typePS = indexChild
                             .getProperty(TYPE_PROPERTY_NAME);
                     if (typePS != null && !typePS.isArray()) {
                         type = typePS.getValue(Type.STRING);
                     }
+                    if (type == null) {
+                        // skip null types
+                        continue;
+                    }
+
                     if (reindex) {
                         reindexTypes.add(type);
+                    } else if (async) {
+                        asyncTypes.add(type);
                     }
-                    existingTypes.add(type);
+                    allTypes.add(type);
                 }
             }
-            existingTypes.remove(TYPE_UNKNOWN);
-            reindexTypes.remove(TYPE_UNKNOWN);
 
             List<IndexHook> hooks = Lists.newArrayList();
             List<IndexHook> reindex = Lists.newArrayList();
-            for (String type : existingTypes) {
-                List<? extends IndexHook> hooksTmp = provider.getIndexHooks(
-                        type, node);
+            for (String type : allTypes) {
                 if (reindexTypes.contains(type)) {
-                    reindex.addAll(hooksTmp);
+                    reindex.addAll(provider.getIndexHooks(type, node));
+                } else if (asyncTypes.contains(type)) {
+                    if (observation != null) {
+                        // TODO what happens when there is no obs scheduler?
+                        observation.addListener(new AsyncEditorProvider(type,
+                                provider, node));
+                    }
                 } else {
-                    hooks.addAll(hooksTmp);
+                    hooks.addAll(provider.getIndexHooks(type, node));
                 }
             }
             reindex(reindex, ref);
@@ -112,6 +126,55 @@
         }
     }
 
+    private static class AsyncEditorProvider implements EditorProvider {
+
+        private final String type;
+
+        private final IndexHookProvider provider;
+
+        private final NodeBuilder node;
+
+        public AsyncEditorProvider(String type, IndexHookProvider provider,
+                NodeBuilder node) {
+            this.type = type;
+            this.provider = provider;
+            this.node = node;
+        }
+
+        @Override
+        public Editor getRootEditor(NodeState before, NodeState after,
+                NodeBuilder builder) {
+            return VisibleEditor.wrap(CompositeEditor.compose(provider
+                    .getIndexHooks(type, node)));
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((type == null) ? 0 : type.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            AsyncEditorProvider other = (AsyncEditorProvider) obj;
+            if (type == null) {
+                if (other.type != null)
+                    return false;
+            } else if (!type.equals(other.type))
+                return false;
+            return true;
+        }
+
+    }
+
     private void reindex(List<IndexHook> hooks, NodeState state)
             throws CommitFailedException {
         if (hooks.isEmpty()) {
@@ -147,11 +210,6 @@
         this.inner.leave(before, after);
     }
 
-    private static boolean isIndexNodeType(PropertyState ps) {
-        return ps != null && !ps.isArray()
-                && ps.getValue(Type.STRING).equals(INDEX_DEFINITIONS_NODE_TYPE);
-    }
-
     @Override
     public void propertyAdded(PropertyState after) throws CommitFailedException {
         inner.propertyAdded(after);
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexHookManager.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexHookManager.java	(revision 1465953)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexHookManager.java	(working copy)
@@ -19,6 +19,7 @@
 import org.apache.jackrabbit.oak.spi.commit.Editor;
 import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
 import org.apache.jackrabbit.oak.spi.commit.VisibleEditor;
+import org.apache.jackrabbit.oak.spi.observation.ObservationScheduler;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 
@@ -38,6 +39,8 @@
 
     private final IndexHookProvider provider;
 
+    private ObservationScheduler observation;
+
     protected IndexHookManager(IndexHookProvider provider) {
         this.provider = provider;
     }
@@ -45,6 +48,11 @@
     @Override
     public Editor getRootEditor(NodeState before, NodeState after,
             NodeBuilder builder) {
-        return VisibleEditor.wrap(new IndexHookManagerDiff(provider, builder));
+        return VisibleEditor.wrap(new IndexHookManagerDiff(provider, builder, observation));
     }
+
+    public IndexHookManager setObservation(ObservationScheduler observation) {
+        this.observation = observation;
+        return this;
+    }
 }
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java	(revision 1465953)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java	(working copy)
@@ -31,6 +31,8 @@
 
     String REINDEX_PROPERTY_NAME = "reindex";
 
+    String ASYNC_PROPERTY_NAME = "async";
+
     /**
      * Marks a unique property index.
      */
Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/IndexHookManagerTest.java
===================================================================
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/IndexHookManagerTest.java	(revision 1465953)
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/IndexHookManagerTest.java	(working copy)
@@ -26,17 +26,22 @@
 import static org.junit.Assert.assertTrue;
 
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.plugins.index.p2.Property2IndexHookProvider;
 import org.apache.jackrabbit.oak.plugins.index.p2.Property2IndexLookup;
-import org.apache.jackrabbit.oak.spi.commit.Editor;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
 import org.apache.jackrabbit.oak.spi.commit.EditorHook;
-import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.observation.ObservationScheduler;
+import org.apache.jackrabbit.oak.spi.observation.ObservationSchedulerImpl;
 import org.apache.jackrabbit.oak.spi.query.PropertyValues;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableSet;
@@ -265,15 +270,9 @@
                         Type.NAME);
         NodeState after = builder.getNodeState();
 
-        EditorProvider provider = new EditorProvider() {
-            @Override
-            public Editor getRootEditor(NodeState before, NodeState after,
-                    NodeBuilder builder) {
-                return new IndexHookManagerDiff(
-                        new Property2IndexHookProvider(), builder);
-            }
-        };
-        EditorHook hook = new EditorHook(provider);
+        IndexHookManager im = IndexHookManager
+                .of(new Property2IndexHookProvider());
+        EditorHook hook = new EditorHook(im);
         NodeState indexed = hook.processCommit(before, after);
 
         // check that the index content nodes exist
@@ -291,4 +290,113 @@
         return c;
     }
 
+    /**
+     * Async Index Test
+     * <ul>
+     * <li>Add an index definition</li>
+     * <li>Add some content</li>
+     * <li>Search & verify</li>
+     * </ul>
+     * 
+     */
+    @Test
+    public void testAsync() throws Exception {
+        NodeStore store = new MemoryNodeStore();
+        NodeState root = store.getRoot();
+
+        NodeBuilder builder = root.builder();
+
+        builder.child("oak:index")
+                .child("rootIndex")
+                .setProperty("propertyNames", "foo")
+                .setProperty("type", "p2")
+                .setProperty("async", "true")
+                .setProperty(JCR_PRIMARYTYPE, INDEX_DEFINITIONS_NODE_TYPE,
+                        Type.NAME);
+        builder.child("newchild")
+                .child("other")
+                .child("oak:index")
+                .child("subIndex")
+                .setProperty("propertyNames", "foo")
+                .setProperty("type", "p2")
+                .setProperty("async", "true")
+                .setProperty(JCR_PRIMARYTYPE, INDEX_DEFINITIONS_NODE_TYPE,
+                        Type.NAME);
+
+        NodeState before = builder.getNodeState();
+        // Add nodes
+        builder.child("testRoot").setProperty("foo", "abc");
+        builder.child("newchild").child("other").child("testChild")
+                .setProperty("foo", "xyz");
+
+        NodeState after = builder.getNodeState();
+
+        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+        ObservationScheduler observation = new ObservationSchedulerImpl(
+                executor, store);
+
+        IndexHookManager im = IndexHookManager.of(
+                new Property2IndexHookProvider()).setObservation(observation);
+        EditorHook hook = new EditorHook(im);
+        NodeState indexed = hook.processCommit(before, after);
+
+        executor.shutdown();
+        for (int i = 0; i < 3 && !executor.isTerminated(); i++) {
+            TimeUnit.SECONDS.sleep(1);
+        }
+        assertTrue("Waited 3 seconds and indexing is not done",
+                executor.isTerminated());
+
+        // first check that the index content nodes exist
+        checkPathExists(indexed, "oak:index", "rootIndex", ":index");
+        checkPathExists(indexed, "newchild", "other", "oak:index", "subIndex",
+                ":index");
+
+        Property2IndexLookup lookup = new Property2IndexLookup(indexed);
+        assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
+
+        Property2IndexLookup lookupChild = new Property2IndexLookup(indexed
+                .getChildNode("newchild").getChildNode("other"));
+        assertEquals(ImmutableSet.of("testChild"),
+                find(lookupChild, "foo", "xyz"));
+        assertEquals(ImmutableSet.of(), find(lookupChild, "foo", "abc"));
+
+    }
+
+    /**
+     * Async Index Test - no scheduler -> no index
+     */
+    @Test
+    public void testAsyncNoScheduler() throws Exception {
+        NodeStore store = new MemoryNodeStore();
+        NodeState root = store.getRoot();
+
+        NodeBuilder builder = root.builder();
+
+        builder.child("oak:index")
+                .child("rootIndex")
+                .setProperty("propertyNames", "foo")
+                .setProperty("type", "p2")
+                .setProperty("reindex", "false")
+                .setProperty("async", "true")
+                .setProperty(JCR_PRIMARYTYPE, INDEX_DEFINITIONS_NODE_TYPE,
+                        Type.NAME);
+
+        NodeState before = builder.getNodeState();
+        // Add nodes
+        builder.child("testRoot").setProperty("foo", "abc");
+
+        NodeState after = builder.getNodeState();
+
+        IndexHookManager im = IndexHookManager
+                .of(new Property2IndexHookProvider());
+        EditorHook hook = new EditorHook(im);
+        NodeState indexed = hook.processCommit(before, after);
+
+        // check that the index content nodes not exist
+        assertFalse(checkPathExists(indexed, "oak:index", "rootIndex")
+                .hasChildNode(":index"));
+
+    }
+
 }
