Index: src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexHookManager.java
===================================================================
--- src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexHookManager.java	(revision 0)
+++ src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexHookManager.java	(revision 0)
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.getBoolean;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.isIndexNodeType;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
+import org.apache.jackrabbit.oak.spi.commit.CompositeEditor;
+import org.apache.jackrabbit.oak.spi.commit.DefaultEditor;
+import org.apache.jackrabbit.oak.spi.commit.Editor;
+import org.apache.jackrabbit.oak.spi.commit.EditorHook;
+import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.commit.VisibleEditor;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncIndexHookManager implements Runnable {
+
+    private static final Logger log = LoggerFactory
+            .getLogger(AsyncIndexHookManager.class);
+
+    // TODO index impl run frequency could be picked up from the index config
+    // directly
+
+    private final NodeStore store;
+
+    private final ScheduledExecutorService executor;
+
+    private final IndexHookProvider provider;
+
+    private NodeState current = EmptyNodeState.EMPTY_NODE;
+
+    final Map<String, IndexTask> active = new ConcurrentHashMap<String, IndexTask>();
+
+    public AsyncIndexHookManager(@Nonnull NodeStore store,
+            @Nonnull ScheduledExecutorService executor,
+            @Nonnull IndexHookProvider provider) {
+        this.store = checkNotNull(store);
+        this.executor = checkNotNull(executor);
+        this.provider = checkNotNull(provider);
+    }
+
+    public synchronized void start() {
+        executor.scheduleWithFixedDelay(this, 100, 1000, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void run() {
+        log.debug("Running background index config watcher task");
+        NodeState after = store.getRoot();
+        try {
+            EditorHook hook = new EditorHook(new EditorProvider() {
+                @Override
+                public Editor getRootEditor(NodeState before, NodeState after,
+                        NodeBuilder builder) {
+                    return VisibleEditor.wrap(new IndexConfigEditor());
+                }
+            });
+            hook.processCommit(current, after);
+            current = after;
+        } catch (CommitFailedException e) {
+            log.warn("IndexTask update failed", e);
+        }
+    }
+
+    public synchronized void replace(Set<String> async) {
+        Set<String> in = new HashSet<String>(async);
+        Set<String> existing = active.keySet();
+        for (String type : existing) {
+            if (in.remove(type)) {
+                addOrUpdate(type);
+            } else {
+                remove(type);
+            }
+        }
+        for (String type : in) {
+            addOrUpdate(type);
+        }
+    }
+
+    void addOrUpdate(String type) {
+        IndexTask task = active.get(type);
+        if (task != null) {
+            // stop existing one
+            task.stop();
+        }
+        task = new IndexTask(store, provider, type);
+        active.put(type, task);
+        task.start(executor);
+    }
+
+    void remove(String type) {
+        IndexTask task = active.remove(type);
+        if (task != null) {
+            task.stop();
+        }
+    }
+
+    class IndexConfigEditor extends DefaultEditor {
+
+        private final Set<String> async;
+
+        public IndexConfigEditor() {
+            async = new HashSet<String>();
+        }
+
+        @Override
+        public void enter(NodeState before, NodeState after)
+                throws CommitFailedException {
+            if (!after.hasChildNode(INDEX_DEFINITIONS_NAME)) {
+                return;
+            }
+            NodeState index = after.getChildNode(INDEX_DEFINITIONS_NAME);
+            for (String indexName : index.getChildNodeNames()) {
+                NodeState indexChild = index.getChildNode(indexName);
+                if (isIndexNodeType(indexChild)) {
+                    boolean isasync = getBoolean(indexChild,
+                            ASYNC_PROPERTY_NAME, false);
+                    String type = null;
+                    PropertyState typePS = indexChild
+                            .getProperty(TYPE_PROPERTY_NAME);
+                    if (typePS != null && !typePS.isArray()) {
+                        type = typePS.getValue(Type.STRING);
+                    }
+                    if (type == null || !isasync) {
+                        // skip null and non-async types
+                        continue;
+                    }
+                    async.add(type);
+                }
+            }
+        }
+
+        @Override
+        public void leave(NodeState before, NodeState after)
+                throws CommitFailedException {
+            replace(async);
+            async.clear();
+        }
+
+    }
+
+    static class IndexTask implements Runnable {
+
+        private static final Logger log = LoggerFactory
+                .getLogger(IndexTask.class);
+
+        private final NodeStore store;
+
+        private final String type;
+
+        private final IndexHookProvider provider;
+
+        private ScheduledFuture<?> future;
+
+        private NodeState before;
+
+        public IndexTask(NodeStore store, IndexHookProvider provider,
+                String type) {
+            this.store = store;
+            this.provider = provider;
+            this.type = type;
+            this.before = EmptyNodeState.EMPTY_NODE;
+        }
+
+        public synchronized void start(ScheduledExecutorService executor) {
+            if (future != null) {
+                throw new IllegalStateException("IndexTask has already started");
+            }
+            future = executor.scheduleWithFixedDelay(this, 100, 1000,
+                    TimeUnit.MILLISECONDS);
+        }
+
+        public synchronized void stop() {
+            if (future == null) {
+                log.warn("IndexTask has already stopped.");
+            }
+            future.cancel(true);
+        }
+
+        @Override
+        public void run() {
+            log.debug("Running background index task");
+            NodeStoreBranch branch = store.branch();
+            NodeState after = branch.getHead();
+            try {
+                EditorHook hook = new EditorHook(new TypedEditorProvider(
+                        provider, type));
+                NodeState processed = hook.processCommit(before, after);
+
+                branch.setRoot(processed);
+                branch.merge(EmptyHook.INSTANCE);
+                before = after;
+            } catch (CommitFailedException e) {
+                log.warn("IndexTask update failed", e);
+            }
+        }
+    }
+
+    /**
+     * This creates a composite editor from a type-filtered index provider.
+     * 
+     */
+    private static class TypedEditorProvider implements EditorProvider {
+
+        private final IndexHookProvider provider;
+
+        private final String type;
+
+        public TypedEditorProvider(IndexHookProvider provider, String type) {
+            this.type = type;
+            this.provider = provider;
+        }
+
+        /**
+         * This does not make any effort to filter async definitions. The
+         * assumption is that given an index type, all of the returned index
+         * hooks inherit the same async assumption.
+         * 
+         */
+        @Override
+        public Editor getRootEditor(NodeState before, NodeState after,
+                NodeBuilder builder) {
+            return VisibleEditor.wrap(CompositeEditor.compose(provider
+                    .getIndexHooks(type, builder)));
+        }
+    }
+
+}
Index: src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexHookManagerTest.java
===================================================================
--- src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexHookManagerTest.java	(revision 0)
+++ src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexHookManagerTest.java	(revision 0)
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexHookManager.IndexTask;
+import org.apache.jackrabbit.oak.plugins.index.p2.Property2IndexHookProvider;
+import org.apache.jackrabbit.oak.plugins.index.p2.Property2IndexLookup;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.query.PropertyValues;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStoreBranch;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class AsyncIndexHookManagerTest {
+
+    // TODO test index config deletes
+
+    private static Set<String> find(Property2IndexLookup lookup, String name,
+            String value) {
+        return Sets.newHashSet(lookup.query(null, name,
+                PropertyValues.newString(value)));
+    }
+
+    private static NodeState checkPathExists(NodeState state, String... verify) {
+        NodeState c = state;
+        for (String p : verify) {
+            c = c.getChildNode(p);
+            assertTrue(c.exists());
+        }
+        return c;
+    }
+
+    /**
+     * Async Index Test
+     * <ul>
+     * <li>Add an index definition</li>
+     * <li>Add some content</li>
+     * <li>Search & verify</li>
+     * </ul>
+     * 
+     */
+    @Test
+    public void testAsync() throws Exception {
+        NodeStore store = new MemoryNodeStore();
+        ScheduledExecutorService executor = Executors.newScheduledThreadPool(0);
+        IndexHookProvider provider = new Property2IndexHookProvider();
+
+        NodeStoreBranch branch = store.branch();
+        NodeState root = branch.getHead();
+        NodeBuilder builder = root.builder();
+        createIndexDefinition(builder.child("oak:index"), "rootIndex", true,
+                false, ImmutableSet.of("foo"), null).setProperty(
+                ASYNC_PROPERTY_NAME, true);
+        builder.child("testRoot").setProperty("foo", "abc");
+
+        // merge it back in
+        branch.setRoot(builder.getNodeState());
+        branch.merge(EmptyHook.INSTANCE);
+
+        AsyncIndexHookManager async = new AsyncIndexHookManager(store,
+                executor, provider);
+        runIndexing(async, 1);
+        root = store.getRoot();
+
+        // first check that the index content nodes exist
+        checkPathExists(root, "oak:index", "rootIndex", ":index");
+
+        Property2IndexLookup lookup = new Property2IndexLookup(root);
+        assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
+    }
+
+    private static void runIndexing(AsyncIndexHookManager async,
+            int expectedTypes) {
+        async.run();
+        Map<String, IndexTask> active = async.active;
+        assertEquals(expectedTypes, active.size());
+        for (IndexTask task : active.values()) {
+            task.run();
+        }
+    }
+
+    /**
+     * Async Index Test with 2 index defs at the same location
+     * <ul>
+     * <li>Add an index definition</li>
+     * <li>Add some content</li>
+     * <li>Search & verify</li>
+     * </ul>
+     * 
+     */
+    @Test
+    public void testAsyncDouble() throws Exception {
+        NodeStore store = new MemoryNodeStore();
+        ScheduledExecutorService executor = Executors.newScheduledThreadPool(0);
+        IndexHookProvider provider = new Property2IndexHookProvider();
+
+        NodeStoreBranch branch = store.branch();
+        NodeState root = branch.getHead();
+        NodeBuilder builder = root.builder();
+        createIndexDefinition(builder.child("oak:index"), "rootIndex", true,
+                false, ImmutableSet.of("foo"), null).setProperty(
+                ASYNC_PROPERTY_NAME, true);
+        createIndexDefinition(builder.child("oak:index"), "rootIndexSecond",
+                true, false, ImmutableSet.of("bar"), null).setProperty(
+                ASYNC_PROPERTY_NAME, true);
+
+        builder.child("testRoot").setProperty("foo", "abc")
+                .setProperty("bar", "def");
+        builder.child("testSecond").setProperty("bar", "ghi");
+
+        // merge it back in
+        branch.setRoot(builder.getNodeState());
+        branch.merge(EmptyHook.INSTANCE);
+
+        AsyncIndexHookManager async = new AsyncIndexHookManager(store,
+                executor, provider);
+        runIndexing(async, 1);
+        root = store.getRoot();
+
+        // first check that the index content nodes exist
+        checkPathExists(root, "oak:index", "rootIndex", ":index");
+        checkPathExists(root, "oak:index", "rootIndexSecond", ":index");
+
+        Property2IndexLookup lookup = new Property2IndexLookup(root);
+        assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
+        assertEquals(ImmutableSet.of(), find(lookup, "foo", "def"));
+        assertEquals(ImmutableSet.of(), find(lookup, "foo", "ghi"));
+
+        assertEquals(ImmutableSet.of(), find(lookup, "bar", "abc"));
+        assertEquals(ImmutableSet.of("testRoot"), find(lookup, "bar", "def"));
+        assertEquals(ImmutableSet.of("testSecond"), find(lookup, "bar", "ghi"));
+
+    }
+
+    /**
+     * Async Index Test with 2 index defs at different tree locations
+     * <ul>
+     * <li>Add an index definition</li>
+     * <li>Add some content</li>
+     * <li>Search & verify</li>
+     * </ul>
+     * 
+     */
+    @Test
+    public void testAsyncDoubleSubtree() throws Exception {
+        NodeStore store = new MemoryNodeStore();
+        ScheduledExecutorService executor = Executors.newScheduledThreadPool(0);
+        IndexHookProvider provider = new Property2IndexHookProvider();
+
+        NodeStoreBranch branch = store.branch();
+        NodeState root = branch.getHead();
+        NodeBuilder builder = root.builder();
+        createIndexDefinition(builder.child("oak:index"), "rootIndex", true,
+                false, ImmutableSet.of("foo"), null).setProperty(
+                ASYNC_PROPERTY_NAME, true);
+        createIndexDefinition(
+                builder.child("newchild").child("other").child("oak:index"),
+                "subIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, true);
+
+        builder.child("testRoot").setProperty("foo", "abc");
+        builder.child("newchild").child("other").child("testChild")
+                .setProperty("foo", "xyz");
+
+        // merge it back in
+        branch.setRoot(builder.getNodeState());
+        branch.merge(EmptyHook.INSTANCE);
+
+        AsyncIndexHookManager async = new AsyncIndexHookManager(store,
+                executor, provider);
+        runIndexing(async, 1);
+        root = store.getRoot();
+
+        // first check that the index content nodes exist
+        checkPathExists(root, "oak:index", "rootIndex", ":index");
+        checkPathExists(root, "newchild", "other", "oak:index", "subIndex",
+                ":index");
+
+        Property2IndexLookup lookup = new Property2IndexLookup(root);
+        assertEquals(ImmutableSet.of("testRoot"), find(lookup, "foo", "abc"));
+
+        Property2IndexLookup lookupChild = new Property2IndexLookup(root
+                .getChildNode("newchild").getChildNode("other"));
+        assertEquals(ImmutableSet.of("testChild"),
+                find(lookupChild, "foo", "xyz"));
+        assertEquals(ImmutableSet.of(), find(lookupChild, "foo", "abc"));
+    }
+
+}
