diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/CacheChangesTracker.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/CacheChangesTracker.java
index 64a01fa..8f69c96 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/CacheChangesTracker.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/cache/CacheChangesTracker.java
@@ -23,9 +23,10 @@ import com.google.common.hash.PrimitiveSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.util.List;
 
-public class CacheChangesTracker {
+public class CacheChangesTracker implements Closeable{
 
     private static final Logger LOG = LoggerFactory.getLogger(CacheChangesTracker.class);
 
@@ -62,6 +63,7 @@ public class CacheChangesTracker {
         return keyFilter.apply(key) && lazyBloomFilter.mightContain(key);
     }
 
+    @Override
     public void close() {
         changeTrackers.remove(this);
 
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentTraverser.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentTraverser.java
new file mode 100644
index 0000000..a2cd351
--- /dev/null
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentTraverser.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import java.util.function.Predicate;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.io.Closer;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.document.cache.CacheChangesTracker;
+import org.apache.jackrabbit.oak.plugins.document.cache.NodeDocumentCache;
+import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
+
+import static java.util.Collections.singletonList;
+
+public class MongoDocumentTraverser {
+    private final MongoDocumentStore mongoStore;
+
+    public MongoDocumentTraverser(MongoDocumentStore mongoStore) {
+        this.mongoStore = mongoStore;
+    }
+
+    public <T extends Document> CloseableIterable<T> getAllDocuments(Collection<T> collection, Predicate<String> filter) {
+        //TODO Handle readOnly
+        boolean readOnly = true;
+        DBCollection dbCollection = mongoStore.getDBCollection(collection);
+        Closer closer = Closer.create();
+        CacheChangesTracker cacheChangesTracker;
+        if (collection == Collection.NODES && !readOnly) {
+            cacheChangesTracker = getNodeDocCache().registerTracker(NodeDocument.MIN_ID_VALUE, NodeDocument.MAX_ID_VALUE);
+            closer.register(cacheChangesTracker);
+        } else {
+            cacheChangesTracker = null;
+        }
+
+        DBCursor cursor = dbCollection.find();
+        //TODO This may lead to reads being routed to secondary depending on MongoURI
+        //So caller must ensure that its safe to read from secondary
+        cursor.setReadPreference(mongoStore.getConfiguredReadPreference(collection));
+        closer.register(cursor);
+
+        @SuppressWarnings("Guava")
+        Iterable<T> result = FluentIterable.from(cursor)
+                .filter(o -> filter.test((String) o.get("_id")))
+                .transform(o -> {
+                    T doc = mongoStore.convertFromDBObject(collection, o);
+                    //TODO Review the cache update approach where tracker has to track *all* docs
+                    if (collection == Collection.NODES) {
+                        NodeDocument nodeDoc = (NodeDocument) doc;
+                        if (readOnly) {
+                            getNodeDocCache().put(nodeDoc);
+                        }
+                        getNodeDocCache().putNonConflictingDocs(cacheChangesTracker, singletonList(nodeDoc));
+                    }
+                    return doc;
+                });
+        return CloseableIterable.wrap(result, closer);
+    }
+
+    private NodeDocumentCache getNodeDocCache() {
+        return mongoStore.getNodeDocumentCache();
+    }
+}
diff --git oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
index a37fccc..7c4015c 100644
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/BasicDocumentStoreTest.java
@@ -16,12 +16,17 @@
  */
 package org.apache.jackrabbit.oak.plugins.document;
 
+import static java.util.Arrays.asList;
+import static org.apache.jackrabbit.oak.plugins.document.util.Utils.getPathFromId;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.Charset;
@@ -32,10 +37,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Condition;
 import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
+import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
 import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1109,6 +1122,34 @@ public class BasicDocumentStoreTest extends AbstractDocumentStoreTest {
         assertNull(ds.getIfCached(Collection.NODES, id));
     }
 
+    @Test
+    public void getAllDocuments() throws Exception{
+        assumeTrue(ds instanceof MongoDocumentStore);
+        ds.create(Collection.NODES, asList(
+                newDocument("/a/b", 1),
+                newDocument("/a/c", 1),
+                newDocument("/d", 1)
+        ));
+
+        ds.invalidateCache();
+
+        MongoDocumentTraverser traverser = new MongoDocumentTraverser((MongoDocumentStore) ds);
+        CloseableIterable<NodeDocument> itr = traverser.getAllDocuments(Collection.NODES, id -> getPathFromId(id).startsWith("/a"));
+        Set<String> paths = StreamSupport.stream(itr.spliterator(), false)
+                .map(NodeDocument::getPath)
+                .collect(Collectors.toSet());
+
+        itr.close();
+        assertThat(paths, containsInAnyOrder("/a/b", "/a/c"));
+
+
+        assertNotNull(ds.getIfCached(Collection.NODES, Utils.getIdFromPath("/a/b")));
+        assertNotNull(ds.getIfCached(Collection.NODES, Utils.getIdFromPath("/a/c")));
+
+        // Excluded id should not be cached
+        assertNull(ds.getIfCached(Collection.NODES, Utils.getIdFromPath("/d")));
+    }
+
     private UpdateOp newDocument(String path, long modified) {
         String id = Utils.getIdFromPath(path);
         UpdateOp op = new UpdateOp(id, true);
diff --git oak-run/pom.xml oak-run/pom.xml
index f9b6d5c..1d396e5 100644
--- oak-run/pom.xml
+++ oak-run/pom.xml
@@ -249,6 +249,11 @@
       <artifactId>mongo-java-driver</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>1.5</version>
+    </dependency>
+    <dependency>
       <groupId>org.mapdb</groupId>
       <artifactId>mapdb</artifactId>
       <version>1.0.6</version>
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
index 760ffdb..382fd05 100644
--- oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexCommand.java
@@ -23,11 +23,13 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.io.Closer;
 import joptsimple.OptionParser;
 import org.apache.commons.io.FileUtils;
 import org.apache.felix.inventory.Format;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.index.indexer.path.DocumentStoreIndexer;
 import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
 import org.apache.jackrabbit.oak.run.cli.CommonOptions;
 import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
@@ -35,8 +37,7 @@ import org.apache.jackrabbit.oak.run.cli.Options;
 import org.apache.jackrabbit.oak.run.commons.Command;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
-import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,8 +77,7 @@ public class IndexCommand implements Command {
         NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
         try (Closer closer = Closer.create()) {
             closer.register(fixture);
-            StatisticsProvider statisticsProvider = WhiteboardUtils.getService(fixture.getWhiteboard(), StatisticsProvider.class);
-            execute(fixture.getStore(), fixture.getBlobStore(), statisticsProvider, indexOpts, closer);
+            execute(fixture.getStore(), fixture.getBlobStore(), fixture.getWhiteboard(), indexOpts, closer);
             tellReportPaths();
         }
     }
@@ -96,9 +96,9 @@ public class IndexCommand implements Command {
         }
     }
 
-    private void execute(NodeStore store, BlobStore blobStore, StatisticsProvider statisticsProvider,
+    private void execute(NodeStore store, BlobStore blobStore, Whiteboard whiteboard,
                          IndexOptions indexOpts, Closer closer) throws IOException, CommitFailedException {
-        IndexHelper indexHelper = new IndexHelper(store, blobStore, statisticsProvider, indexOpts.getOutDir(),
+        IndexHelper indexHelper = new IndexHelper(store, blobStore, whiteboard, indexOpts.getOutDir(),
                 indexOpts.getWorkDir(), indexOpts.getIndexPaths());
 
         closer.register(indexHelper);
@@ -120,9 +120,22 @@ public class IndexCommand implements Command {
         } else {
             String checkpoint = indexOpts.getCheckpoint();
             checkNotNull(checkpoint, "Checkpoint value is required for reindexing done in read only mode");
-            try (OutOfBandIndexer indexer = new OutOfBandIndexer(indexHelper, checkpoint)) {
-                indexer.reindex();
+
+            Stopwatch w = Stopwatch.createStarted();
+            IndexerSupport indexerSupport = new IndexerSupport(indexHelper, checkpoint);
+            if (opts.getCommonOpts().isDocument()) {
+                try (DocumentStoreIndexer indexer = new DocumentStoreIndexer(indexHelper, indexerSupport)) {
+                    indexer.reindex();
+                }
+            } else {
+                try (OutOfBandIndexer indexer = new OutOfBandIndexer(indexHelper, indexerSupport)) {
+                    indexer.reindex();
+                }
             }
+            indexerSupport.writeMetaInfo(checkpoint);
+            File destDir = indexerSupport.copyIndexFilesToOutput();
+            log.info("Indexing completed for indexes {} in {} and index files are copied to {}",
+                    indexHelper.getIndexPaths(), w, destDir);
         }
     }
 
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java
index cf4603d..b5f29b9 100644
--- oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexHelper.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
 import com.google.common.collect.ImmutableList;
@@ -50,11 +51,15 @@ import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.mount.MountInfoProvider;
 import org.apache.jackrabbit.oak.spi.mount.Mounts;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class IndexHelper implements Closeable{
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class IndexHelper implements Closeable{
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final NodeStore store;
     private final File outputDir;
@@ -63,17 +68,19 @@ class IndexHelper implements Closeable{
     private IndexPathService indexPathService;
     private AsyncIndexInfoService asyncIndexInfoService;
     private final List<String> indexPaths;
+    private final Whiteboard whiteboard;
     private LuceneIndexHelper luceneIndexHelper;
     private Executor executor;
     private final Closer closer = Closer.create();
     private final BlobStore blobStore;
     private final StatisticsProvider statisticsProvider;
 
-    IndexHelper(NodeStore store, BlobStore blobStore, StatisticsProvider statisticsProvider,
+    IndexHelper(NodeStore store, BlobStore blobStore, Whiteboard whiteboard,
                 File outputDir, File workDir, List<String> indexPaths) {
         this.store = store;
         this.blobStore = blobStore;
-        this.statisticsProvider = statisticsProvider;
+        this.whiteboard = whiteboard;
+        this.statisticsProvider = checkNotNull(WhiteboardUtils.getService(whiteboard, StatisticsProvider.class));
         this.outputDir = outputDir;
         this.workDir = workDir;
         this.indexPaths = ImmutableList.copyOf(indexPaths);
@@ -143,6 +150,11 @@ class IndexHelper implements Closeable{
         return luceneIndexHelper;
     }
 
+    @CheckForNull
+    public <T> T getService(@Nonnull Class<T> type) {
+        return WhiteboardUtils.getService(whiteboard, type);
+    }
+
     @Override
     public void close() throws IOException {
         closer.close();
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java
new file mode 100644
index 0000000..68b6ddb
--- /dev/null
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class IndexerSupport {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    /**
+     * Directory name in output directory under which indexes are
+     * stored
+     */
+    public static final String LOCAL_INDEX_ROOT_DIR = "indexes";
+    /**
+     * File name stored in final index directory which contains meta
+     * information like checkpoint details. This can be used by
+     * importer while importing the indexes
+     */
+    private static final String INDEXER_META = "indexer-info.txt";
+    /**
+     * Checkpoint value which indicate that head state needs to be used
+     * This would be mostly used for testing purpose
+     */
+    private static final String HEAD_AS_CHECKPOINT = "head";
+    private Map<String, String> checkpointInfo = Collections.emptyMap();
+    private final IndexHelper indexHelper;
+    private File localIndexDir;
+    private String checkpoint;
+
+    public IndexerSupport(IndexHelper indexHelper, String checkpoint) {
+        this.indexHelper = indexHelper;
+        this.checkpoint = checkpoint;
+    }
+
+    public File getLocalIndexDir() throws IOException {
+        if (localIndexDir == null) {
+            localIndexDir = new File(indexHelper.getWorkDir(), LOCAL_INDEX_ROOT_DIR);
+            FileUtils.forceMkdir(localIndexDir);
+        }
+        return localIndexDir;
+    }
+
+    public File copyIndexFilesToOutput() throws IOException {
+        File destDir = new File(indexHelper.getOutputDir(), getLocalIndexDir().getName());
+        FileUtils.moveDirectoryToDirectory(getLocalIndexDir(), indexHelper.getOutputDir(), true);
+        return destDir;
+    }
+
+    public void writeMetaInfo(String checkpoint) throws IOException {
+        Properties props = new Properties();
+        props.put("checkpoint", checkpoint);
+        try (OutputStream os = FileUtils.openOutputStream(new File(getLocalIndexDir(), INDEXER_META))) {
+            props.store(os, "Indexer info");
+        }
+    }
+
+    public NodeState retrieveNodeStateForCheckpoint() {
+        NodeState checkpointedState;
+        if (HEAD_AS_CHECKPOINT.equals(checkpoint)) {
+            checkpointedState = indexHelper.getNodeStore().getRoot();
+            log.warn("Using head state for indexing. Such an index cannot be imported back");
+        } else {
+            checkpointedState = indexHelper.getNodeStore().retrieve(checkpoint);
+            checkNotNull(checkpointedState, "Not able to retrieve revision referred via checkpoint [%s]", checkpoint);
+            checkpointInfo = indexHelper.getNodeStore().checkpointInfo(checkpoint);
+        }
+        return checkpointedState;
+    }
+
+    public Map<String, String> getCheckpointInfo() {
+        return checkpointInfo;
+    }
+}
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexer.java oak-run/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexer.java
index b568324..ebcbe6d 100644
--- oak-run/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexer.java
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/index/OutOfBandIndexer.java
@@ -41,7 +41,6 @@ import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdate;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
 import org.apache.jackrabbit.oak.plugins.index.NodeTraversalCallback;
-import org.apache.jackrabbit.oak.plugins.index.counter.jmx.NodeCounter;
 import org.apache.jackrabbit.oak.plugins.index.lucene.directory.DirectoryFactory;
 import org.apache.jackrabbit.oak.plugins.index.lucene.directory.FSDirectoryFactory;
 import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
@@ -80,66 +79,44 @@ public class OutOfBandIndexer implements Closeable, IndexUpdateCallback, NodeTra
      * i.e. when a sync index is reindexed in out of band mode
      */
     private static final String ASYNC_PREVIOUS_NONE = "none";
-    /**
-     * Directory name in output directory under which indexes are
-     * stored
-     */
-    public static final String LOCAL_INDEX_ROOT_DIR = "indexes";
-    /**
-     * File name stored in final index directory which contains meta
-     * information like checkpoint details. This can be used by
-     * importer while importing the indexes
-     */
-    private static final String INDEXER_META = "indexer-info.txt";
 
-    /**
-     * Checkpoint value which indicate that head state needs to be used
-     * This would be mostly used for testing purpose
-     */
-    private static final String HEAD_AS_CHECKPOINT = "head";
+
 
     private final Closer closer = Closer.create();
     private final IndexHelper indexHelper;
-    private final String checkpoint;
-    private Map<String, String> checkpointInfo = Collections.emptyMap();
     private NodeStore copyOnWriteStore;
     private LuceneIndexHelper luceneIndexHelper;
-    private File localIndexDir;
+    private IndexerSupport indexerSupport;
 
     //TODO Support for providing custom index definition i.e. where definition is not
     //present in target repository
 
-    public OutOfBandIndexer(IndexHelper indexHelper, String checkpoint) {
+    public OutOfBandIndexer(IndexHelper indexHelper, IndexerSupport indexerSupport) {
         this.indexHelper = checkNotNull(indexHelper);
-        this.checkpoint = checkNotNull(checkpoint);
+        this.indexerSupport = checkNotNull(indexerSupport);
     }
 
     public void reindex() throws CommitFailedException, IOException {
         Stopwatch w = Stopwatch.createStarted();
 
-        NodeState checkpointedState = retrieveNodeStateForCheckpoint();
+        NodeState checkpointedState = indexerSupport.retrieveNodeStateForCheckpoint();
 
         copyOnWriteStore = new MemoryNodeStore(checkpointedState);
         NodeState baseState = copyOnWriteStore.getRoot();
         //TODO Check for indexPaths being empty
 
-        log.info("Proceeding to index {} upto checkpoint {} {}", indexHelper.getIndexPaths(), checkpoint, checkpointInfo);
+
 
         switchIndexLanesAndReindexFlag();
         preformIndexUpdate(baseState);
-        writeMetaInfo();
-        File destDir = copyIndexFilesToOutput();
+        File destDir = indexerSupport.copyIndexFilesToOutput();
 
         log.info("Indexing completed for indexes {} in {} and index files are copied to {}",
                 indexHelper.getIndexPaths(), w, IndexCommand.getPath(destDir));
     }
 
     private File getLocalIndexDir() throws IOException {
-        if (localIndexDir == null) {
-            localIndexDir = new File(indexHelper.getWorkDir(), LOCAL_INDEX_ROOT_DIR);
-            FileUtils.forceMkdir(localIndexDir);
-        }
-        return localIndexDir;
+        return indexerSupport.getLocalIndexDir();
     }
 
     @Override
@@ -219,19 +196,6 @@ public class OutOfBandIndexer implements Closeable, IndexUpdateCallback, NodeTra
         log.info("Switched the async lane for indexes at {} to {} and marked them for reindex", indexHelper.getIndexPaths(), REINDEX_LANE);
     }
 
-    private NodeState retrieveNodeStateForCheckpoint() {
-        NodeState checkpointedState;
-        if (HEAD_AS_CHECKPOINT.equals(checkpoint)) {
-            checkpointedState = indexHelper.getNodeStore().getRoot();
-            log.warn("Using head state for indexing. Such an index cannot be imported back");
-        } else {
-            checkpointedState = indexHelper.getNodeStore().retrieve(checkpoint);
-            checkNotNull(checkpointedState, "Not able to retrieve revision referred via checkpoint [%s]", checkpoint);
-            checkpointInfo = indexHelper.getNodeStore().checkpointInfo(checkpoint);
-        }
-        return checkpointedState;
-    }
-
     /**
      * Make a copy of current async value and replace it with one required for offline reindexing
      */
@@ -255,20 +219,6 @@ public class OutOfBandIndexer implements Closeable, IndexUpdateCallback, NodeTra
         idxBuilder.setProperty(newAsyncState);
     }
 
-    private void writeMetaInfo() throws IOException {
-        Properties props = new Properties();
-        props.put("checkpoint", checkpoint);
-        try (OutputStream os = FileUtils.openOutputStream(new File(getLocalIndexDir(), INDEXER_META))) {
-            props.store(os, "Indexer info");
-        }
-    }
-
-    private File copyIndexFilesToOutput() throws IOException {
-        File destDir = new File(indexHelper.getOutputDir(), getLocalIndexDir().getName());
-        FileUtils.moveDirectoryToDirectory(getLocalIndexDir(), indexHelper.getOutputDir(), true);
-        return destDir;
-    }
-
     private void configureEstimators(IndexUpdate indexUpdate) {
         StatisticsProvider statsProvider = indexHelper.getStatisticsProvider();
         if (statsProvider instanceof MetricStatisticsProvider) {
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/CompositeIndexer.java oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/CompositeIndexer.java
new file mode 100644
index 0000000..1246ab0
--- /dev/null
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/CompositeIndexer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.path;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class CompositeIndexer implements NodeStateIndexer {
+    private final List<NodeStateIndexer> indexers;
+
+    public CompositeIndexer(List<NodeStateIndexer> indexers) {
+        this.indexers = checkNotNull(indexers);
+    }
+
+    public boolean isEmpty() {
+        return indexers.isEmpty();
+    }
+
+    @Override
+    public boolean shouldInclude(String path) {
+        return indexers.stream().anyMatch(indexer -> indexer.shouldInclude(path));
+    }
+
+    @Override
+    public boolean shouldInclude(NodeDocument doc) {
+        return indexers.stream().anyMatch(indexer -> indexer.shouldInclude(doc));
+    }
+
+    @Override
+    public void index(NodeStateEntry entry) throws IOException, CommitFailedException {
+        for (NodeStateIndexer indexer : indexers) {
+            indexer.index(entry);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/DocumentStoreIndexer.java oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/DocumentStoreIndexer.java
new file mode 100644
index 0000000..014233b
--- /dev/null
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/DocumentStoreIndexer.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.path;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Closer;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.index.IndexHelper;
+import org.apache.jackrabbit.oak.index.IndexerSupport;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeState;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentTraverser;
+import org.apache.jackrabbit.oak.plugins.document.util.CloseableIterable;
+import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
+import org.apache.jackrabbit.oak.plugins.index.NodeTraversalCallback;
+import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
+import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
+import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+
+public class DocumentStoreIndexer implements Closeable{
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final Logger traversalLog = LoggerFactory.getLogger(DocumentStoreIndexer.class.getName()+".traversal");
+    private final Closer closer = Closer.create();
+    private final IndexHelper indexHelper;
+    private final List<NodeStateIndexerProvider> indexerProviders;
+    private final IndexerSupport indexerSupport;
+    private final IndexingProgressReporter progressReporter =
+            new IndexingProgressReporter(IndexUpdateCallback.NOOP, NodeTraversalCallback.NOOP);
+
+    public DocumentStoreIndexer(IndexHelper indexHelper, IndexerSupport indexerSupport) throws IOException {
+        this.indexHelper = indexHelper;
+        this.indexerSupport = indexerSupport;
+        this.indexerProviders = createProviders();
+    }
+
+    public void reindex() throws CommitFailedException, IOException {
+        configureEstimators();
+        CompositeIndexer indexer = prepareIndexers();
+        if (indexer.isEmpty()) {
+            return;
+        }
+
+        closer.register(indexer);
+
+        //TODO How to ensure we can safely read from secondary
+        NodeState checkpointedState = indexerSupport.retrieveNodeStateForCheckpoint();
+        DocumentNodeState rootDocumentState = (DocumentNodeState) checkpointedState;
+        DocumentNodeStore nodeStore = (DocumentNodeStore) indexHelper.getNodeStore();
+
+        progressReporter.reindexingTraversalStart("/");
+        for (NodeDocument doc : getIncludedDocs(indexer)) {
+            String path = doc.getPath();
+
+            DocumentNodeState nodeState = nodeStore.getNode(path, rootDocumentState.getRootRevision());
+
+            //At DocumentNodeState api level the nodeState can be null
+            if (nodeState == null || !nodeState.exists()) {
+                continue;
+            }
+
+            //TODO Handle bundled NodeStates
+
+            NodeStateEntry entry = new NodeStateEntry(nodeState, path);
+            indexer.index(entry);
+        }
+
+        progressReporter.reindexingTraversalEnd();
+        progressReporter.logReport();
+    }
+
+    private void configureEstimators() {
+        StatisticsProvider statsProvider = indexHelper.getStatisticsProvider();
+        if (statsProvider instanceof MetricStatisticsProvider) {
+            MetricRegistry registry = ((MetricStatisticsProvider) statsProvider).getRegistry();
+            progressReporter.setTraversalRateEstimator(new MetricRateEstimator("async", registry));
+        }
+
+        MongoConnection mongoConnection = indexHelper.getService(MongoConnection.class);
+        if (mongoConnection != null) {
+            long nodesCount = mongoConnection.getDB().getCollection("nodes").count();
+            progressReporter.setNodeCountEstimator((String basePath, Set<String> indexPaths) -> nodesCount);
+            log.info("Estimated number of documents in Mongo are {}", nodesCount);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        closer.close();
+    }
+
+    @SuppressWarnings("Guava")
+    private Iterable<NodeDocument> getIncludedDocs(CompositeIndexer indexer) {
+        return FluentIterable.from(getDocsFilteredByPath(indexer))
+                .filter(doc -> !doc.isSplitDocument())
+                .filter(indexer::shouldInclude);
+    }
+
+    private Iterable<NodeDocument> getDocsFilteredByPath(CompositeIndexer indexer) {
+        CloseableIterable<NodeDocument> docs = findAllDocuments(indexer);
+        closer.register(docs);
+        return docs;
+    }
+
+    private CloseableIterable<NodeDocument> findAllDocuments(CompositeIndexer indexer) {
+        MongoDocumentStore mds = indexHelper.getService(MongoDocumentStore.class);
+        checkNotNull(mds);
+        return new MongoDocumentTraverser(mds).getAllDocuments(Collection.NODES, id -> includeId(id, indexer));
+    }
+
+    private boolean includeId(String id, NodeStateIndexer indexer) {
+        readNodeDocument(id);
+        //Cannot interpret long paths as they are hashed. So let them
+        //be included
+        if (Utils.isIdFromLongPath(id)){
+            return true;
+        }
+
+        //Not easy to determine path for previous docs
+        //Given there count is pretty low compared to others
+        //include them all
+        if (Utils.isPreviousDocId(id)){
+            return true;
+        }
+
+        String path = Utils.getPathFromId(id);
+
+        //Exclude hidden nodes from index data
+        if (NodeStateUtils.isHiddenPath(path)){
+            return false;
+        }
+
+        return indexer.shouldInclude(path);
+    }
+
+    private void readNodeDocument(String id) {
+        try {
+            progressReporter.traversedNode(() -> id);
+        } catch (CommitFailedException e) {
+            throw new RuntimeException(e);
+        }
+        traversalLog.trace(id);
+    }
+
+    private CompositeIndexer prepareIndexers() {
+        NodeState root = indexHelper.getNodeStore().getRoot();
+        List<NodeStateIndexer> indexers = new ArrayList<>();
+        for (String indexPath : indexHelper.getIndexPaths()) {
+            NodeState indexState = NodeStateUtils.getNode(root, indexPath);
+            String type = indexState.getString(TYPE_PROPERTY_NAME);
+            if (type == null) {
+                log.warn("No 'type' property found on indexPath [{}]. Skipping it", indexPath);
+                continue;
+            }
+            for (NodeStateIndexerProvider indexerProvider : indexerProviders) {
+                NodeStateIndexer indexer = indexerProvider.getIndexer(type, indexPath, indexState, root, progressReporter);
+                if (indexer != null) {
+                    indexers.add(indexer);
+                    closer.register(indexer);
+                    progressReporter.registerIndex(indexPath, true, -1);
+                }
+            }
+        }
+
+        return new CompositeIndexer(indexers);
+    }
+
+    private List<NodeStateIndexerProvider> createProviders() throws IOException {
+        List<NodeStateIndexerProvider> providers = ImmutableList.of(
+          createLuceneIndexProvider()
+        );
+
+        providers.forEach(closer::register);
+        return providers;
+    }
+
+    private NodeStateIndexerProvider createLuceneIndexProvider() throws IOException {
+        return new LuceneIndexerProvider(indexHelper, indexerSupport);
+    }
+}
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/LuceneIndexer.java oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/LuceneIndexer.java
new file mode 100644
index 0000000..1834193
--- /dev/null
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/LuceneIndexer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.path;
+
+import java.io.IOException;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.index.PathFilter;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition.IndexingRule;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneDocumentMaker;
+import org.apache.jackrabbit.oak.plugins.index.lucene.binary.BinaryTextExtractor;
+import org.apache.jackrabbit.oak.plugins.index.lucene.util.FacetHelper;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
+import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.lucene.document.Document;
+
+public class LuceneIndexer implements NodeStateIndexer {
+    private final IndexDefinition definition;
+    private final BinaryTextExtractor binaryTextExtractor;
+    private final NodeBuilder definitionBuilder;
+    private final LuceneIndexWriter indexWriter;
+    private final IndexingProgressReporter progressReporter;
+
+    public LuceneIndexer(IndexDefinition definition, LuceneIndexWriter indexWriter,
+                         NodeBuilder builder, BinaryTextExtractor binaryTextExtractor,
+                         IndexingProgressReporter progressReporter) {
+        this.definition = definition;
+        this.binaryTextExtractor = binaryTextExtractor;
+        this.indexWriter = indexWriter;
+        this.definitionBuilder = builder;
+        this.progressReporter = progressReporter;
+    }
+
+    @Override
+    public boolean shouldInclude(String path) {
+        return definition.getPathFilter().filter(path) != PathFilter.Result.EXCLUDE;
+    }
+
+    @Override
+    public boolean shouldInclude(NodeDocument doc) {
+        //TODO possible optimization for NodeType based filtering
+        return true;
+    }
+
+    @Override
+    public void index(NodeStateEntry entry) throws IOException, CommitFailedException {
+        IndexingRule indexingRule = definition.getApplicableIndexingRule(entry.getNodeState());
+
+        if (indexingRule == null) {
+            return;
+        }
+
+        LuceneDocumentMaker maker = newDocumentMaker(indexingRule, entry.getPath());
+        Document doc = maker.makeDocument(entry.getNodeState());
+        if (doc != null) {
+            writeToIndex(doc, entry.getPath());
+            progressReporter.indexUpdate(definition.getIndexPath());
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        indexWriter.close(System.currentTimeMillis());
+    }
+
+    private void writeToIndex(Document doc, String path) throws IOException {
+        indexWriter.updateDocument(path, doc);
+    }
+
+    private LuceneDocumentMaker newDocumentMaker(IndexingRule indexingRule, String path) {
+        return new LuceneDocumentMaker(
+                binaryTextExtractor,
+                () -> FacetHelper.getFacetsConfig(definitionBuilder), //TODO FacetsConfig handling
+                null,   //TODO augmentorFactory
+                definition,
+                indexingRule,
+                path
+        );
+    }
+}
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/LuceneIndexerProvider.java oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/LuceneIndexerProvider.java
new file mode 100644
index 0000000..5614265
--- /dev/null
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/LuceneIndexerProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.path;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.index.IndexHelper;
+import org.apache.jackrabbit.oak.index.IndexerSupport;
+import org.apache.jackrabbit.oak.plugins.index.lucene.ExtractedTextCache;
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.lucene.binary.BinaryTextExtractor;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.DirectoryFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.FSDirectoryFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.DefaultIndexWriterFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
+import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriterFactory;
+import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE;
+
+public class LuceneIndexerProvider implements NodeStateIndexerProvider {
+    private final ExtractedTextCache textCache =
+            new ExtractedTextCache(FileUtils.ONE_MB * 5, TimeUnit.HOURS.toSeconds(5));
+    private final IndexHelper indexHelper;
+    private final DirectoryFactory dirFactory;
+    private final LuceneIndexWriterFactory indexWriterFactory;
+
+    public LuceneIndexerProvider(IndexHelper indexHelper, IndexerSupport indexerSupport) throws IOException {
+        this.indexHelper = indexHelper;
+        this.dirFactory = new FSDirectoryFactory(indexerSupport.getLocalIndexDir());
+        this.indexWriterFactory = new DefaultIndexWriterFactory(indexHelper.getMountInfoProvider(), dirFactory);
+    }
+
+    @Override
+    public NodeStateIndexer getIndexer(@Nonnull String type, @Nonnull String indexPath,
+                                       @Nonnull NodeState definition, @Nonnull NodeState root,
+                                       IndexingProgressReporter progressReporter) {
+        if (!TYPE_LUCENE.equals(definition.getString(TYPE_PROPERTY_NAME))) {
+            return null;
+        }
+
+        NodeBuilder builder = definition.builder();
+        IndexDefinition idxDefinition = IndexDefinition.newBuilder(root, definition, indexPath).reindex().build();
+
+        LuceneIndexWriter indexWriter = indexWriterFactory.newInstance(idxDefinition, builder, true);
+        BinaryTextExtractor textExtractor = new BinaryTextExtractor(textCache, idxDefinition, true);
+        return new LuceneIndexer(
+                idxDefinition,
+                indexWriter,
+                builder,
+                textExtractor,
+                progressReporter
+        );
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/NodeStateEntry.java oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/NodeStateEntry.java
new file mode 100644
index 0000000..f7be28c
--- /dev/null
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/NodeStateEntry.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.path;
+
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+public class NodeStateEntry {
+    private final NodeState nodeState;
+    private final String path;
+
+    public NodeStateEntry(NodeState nodeState, String path) {
+        this.nodeState = nodeState;
+        this.path = path;
+    }
+
+    public NodeState getNodeState() {
+        return nodeState;
+    }
+
+    public String getPath() {
+        return path;
+    }
+}
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/NodeStateIndexer.java oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/NodeStateIndexer.java
new file mode 100644
index 0000000..675db73
--- /dev/null
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/NodeStateIndexer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.path;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+
+public interface NodeStateIndexer extends Closeable{
+
+    boolean shouldInclude(String path);
+
+    boolean shouldInclude(NodeDocument doc);
+
+    void index(NodeStateEntry entry) throws IOException, CommitFailedException;
+}
+
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/NodeStateIndexerProvider.java oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/NodeStateIndexerProvider.java
new file mode 100644
index 0000000..16e9b40
--- /dev/null
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/index/indexer/path/NodeStateIndexerProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.path;
+
+import java.io.Closeable;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+public interface NodeStateIndexerProvider extends Closeable {
+
+    @CheckForNull
+    NodeStateIndexer getIndexer(@Nonnull String type,
+                                @Nonnull String indexPath,
+                                @Nonnull NodeState definition,
+                                @Nonnull NodeState root, IndexingProgressReporter progressReporter);
+}
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/run/cli/CommonOptions.java oak-run/src/main/java/org/apache/jackrabbit/oak/run/cli/CommonOptions.java
index 17eccc3..58788a6 100644
--- oak-run/src/main/java/org/apache/jackrabbit/oak/run/cli/CommonOptions.java
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/run/cli/CommonOptions.java
@@ -70,7 +70,11 @@ public class CommonOptions implements OptionsBean {
     }
 
     public boolean isSegment(){
-        return !isMongo() && !isRDB();
+        return !isDocument();
+    }
+
+    public boolean isDocument(){
+        return isMongo() || isRDB();
     }
 
     public boolean isMetricsEnabled() {
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/run/cli/NodeStoreFixtureProvider.java oak-run/src/main/java/org/apache/jackrabbit/oak/run/cli/NodeStoreFixtureProvider.java
index e690d41..48b7195 100644
--- oak-run/src/main/java/org/apache/jackrabbit/oak/run/cli/NodeStoreFixtureProvider.java
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/run/cli/NodeStoreFixtureProvider.java
@@ -35,7 +35,11 @@ import com.google.common.io.Closer;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.mongodb.MongoClientURI;
 import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.DocumentStore;
+import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDataSourceFactory;
+import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentStore;
 import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
 import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider;
 import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
@@ -49,6 +53,7 @@ import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 
 import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
+import static java.util.Collections.emptyMap;
 import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
 
 public class NodeStoreFixtureProvider {
@@ -70,7 +75,7 @@ public class NodeStoreFixtureProvider {
         }
 
         StatisticsProvider statisticsProvider = createStatsProvider(options, wb, closer);
-        wb.register(StatisticsProvider.class, statisticsProvider, Collections.emptyMap());
+        wb.register(StatisticsProvider.class, statisticsProvider, emptyMap());
 
         NodeStore store;
         if (commonOpts.isMongo() || commonOpts.isRDB()) {
@@ -121,6 +126,7 @@ public class NodeStoreFixtureProvider {
             );
         }
 
+        DocumentNodeStore dns = null;
         if (commonOpts.isMongo()) {
             MongoClientURI uri = new MongoClientURI(commonOpts.getStoreArg());
             if (uri.getDatabase() == null) {
@@ -129,18 +135,21 @@ public class NodeStoreFixtureProvider {
                 System.exit(1);
             }
             MongoConnection mongo = new MongoConnection(uri.getURI());
-            wb.register(MongoConnection.class, mongo, Collections.emptyMap());
+            wb.register(MongoConnection.class, mongo, emptyMap());
             closer.register(mongo::close);
             builder.setMongoDB(mongo.getDB());
+            dns = builder.getNodeStore();
+            wb.register(MongoDocumentStore.class, (MongoDocumentStore) builder.getDocumentStore(), emptyMap());
         } else if (commonOpts.isRDB()) {
             RDBStoreOptions rdbOpts = options.getOptionBean(RDBStoreOptions.class);
             DataSource ds = RDBDataSourceFactory.forJdbcUrl(commonOpts.getStoreArg(),
                     rdbOpts.getUser(), rdbOpts.getPassword());
-            wb.register(DataSource.class, ds, Collections.emptyMap());
+            wb.register(DataSource.class, ds, emptyMap());
             builder.setRDBConnection(ds);
+            dns = builder.getNodeStore();
+            wb.register(RDBDocumentStore.class, (RDBDocumentStore) builder.getDocumentStore(), emptyMap());
         }
-
-        return builder.getNodeStore();
+        return dns;
     }
 
     private static NodeStore configureSegment(Options options, BlobStore blobStore, StatisticsProvider statisticsProvider, Closer closer, boolean readOnly)
@@ -178,7 +187,7 @@ public class NodeStoreFixtureProvider {
             MetricStatisticsProvider statsProvider = new MetricStatisticsProvider(getPlatformMBeanServer(), executorService);
             closer.register(statsProvider);
             closer.register(() -> reportMetrics(statsProvider));
-            wb.register(MetricRegistry.class, statsProvider.getRegistry(), Collections.emptyMap());
+            wb.register(MetricRegistry.class, statsProvider.getRegistry(), emptyMap());
             return statsProvider;
         }
         return StatisticsProvider.NOOP;
diff --git oak-run/src/test/java/org/apache/jackrabbit/oak/index/IndexCommandIT.java oak-run/src/test/java/org/apache/jackrabbit/oak/index/IndexCommandIT.java
index 2cb3d01..86bc019 100644
--- oak-run/src/test/java/org/apache/jackrabbit/oak/index/IndexCommandIT.java
+++ oak-run/src/test/java/org/apache/jackrabbit/oak/index/IndexCommandIT.java
@@ -20,14 +20,9 @@
 package org.apache.jackrabbit.oak.index;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import javax.jcr.Node;
-import javax.jcr.PropertyType;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
 
 import com.google.common.io.Files;
 import org.apache.jackrabbit.oak.api.PropertyState;
@@ -35,15 +30,10 @@ import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.lucene.directory.IndexRootDirectory;
 import org.apache.jackrabbit.oak.plugins.index.lucene.directory.LocalIndexDir;
-import org.apache.jackrabbit.oak.plugins.index.lucene.util.IndexDefinitionBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.junit.After;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 import static java.nio.charset.Charset.defaultCharset;
-import static org.apache.jackrabbit.commons.JcrUtils.getOrCreateByPath;
 import static org.apache.jackrabbit.oak.spi.state.NodeStateUtils.getNode;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.not;
@@ -52,18 +42,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
-public class IndexCommandIT {
-
-    @Rule
-    public final TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target"));
-    private RepositoryFixture fixture;
-
-    @After
-    public void cleaup() throws IOException {
-        if (fixture != null) {
-            fixture.close();
-        }
-    }
+public class IndexCommandIT extends AbstractIndexCommandTest {
 
     @Test
     public void dumpStatsAndInfo() throws Exception{
@@ -232,7 +211,7 @@ public class IndexCommandIT {
         PropertyState reindexCount = getNode(store2.getRoot(), "/oak:index/fooIndex").getProperty(IndexConstants.REINDEX_COUNT);
         assertEquals(1, reindexCount.getValue(Type.LONG).longValue());
 
-        File indexes = new File(outDir, OutOfBandIndexer.LOCAL_INDEX_ROOT_DIR);
+        File indexes = new File(outDir, IndexerSupport.LOCAL_INDEX_ROOT_DIR);
         assertTrue(indexes.exists());
 
         IndexRootDirectory idxRoot = new IndexRootDirectory(indexes);
@@ -241,46 +220,4 @@ public class IndexCommandIT {
         assertEquals(1, idxDirs.size());
     }
 
-    private void createTestData(boolean asyncIndex) throws IOException, RepositoryException {
-        fixture = new RepositoryFixture(temporaryFolder.newFolder());
-        indexIndexDefinitions();
-        createLuceneIndex(asyncIndex);
-        addTestContent();
-    }
-
-    private void indexIndexDefinitions() throws IOException, RepositoryException {
-        //By default Oak index definitions are not indexed
-        //so add them to declaringNodeTypes
-        Session session = fixture.getAdminSession();
-        Node nodeType = session.getNode("/oak:index/nodetype");
-        nodeType.setProperty(IndexConstants.DECLARING_NODE_TYPES, new String[] {"oak:QueryIndexDefinition"}, PropertyType.NAME);
-        session.save();
-        session.logout();
-    }
-
-    private void addTestContent() throws IOException, RepositoryException {
-        Session session = fixture.getAdminSession();
-        for (int i = 0; i < 100; i++) {
-            getOrCreateByPath("/testNode/a"+i,
-                    "oak:Unstructured", session).setProperty("foo", "bar");
-        }
-        session.save();
-        session.logout();
-    }
-
-    private void createLuceneIndex(boolean asyncIndex) throws IOException, RepositoryException {
-        IndexDefinitionBuilder idxBuilder = new IndexDefinitionBuilder();
-        if (!asyncIndex) {
-            idxBuilder.noAsync();
-        }
-        idxBuilder.indexRule("nt:base").property("foo").propertyIndex();
-
-        Session session = fixture.getAdminSession();
-        Node fooIndex = getOrCreateByPath("/oak:index/fooIndex",
-                "oak:QueryIndexDefinition", session);
-
-        idxBuilder.build(fooIndex);
-        session.save();
-        session.logout();
-    }
 }
\ No newline at end of file
diff --git oak-run/src/test/java/org/apache/jackrabbit/oak/index/RepositoryFixture.java oak-run/src/test/java/org/apache/jackrabbit/oak/index/RepositoryFixture.java
index 893c141..f8583c2 100644
--- oak-run/src/test/java/org/apache/jackrabbit/oak/index/RepositoryFixture.java
+++ oak-run/src/test/java/org/apache/jackrabbit/oak/index/RepositoryFixture.java
@@ -54,7 +54,12 @@ public class RepositoryFixture implements Closeable {
     private Whiteboard whiteboard;
 
     public RepositoryFixture(File dir) {
+        this(dir, null);
+    }
+
+    public RepositoryFixture(File dir, NodeStore nodeStore) {
         this.dir = dir;
+        this.nodeStore = nodeStore;
     }
 
     public Repository getRepository() throws IOException {
