diff --git oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
index 6dbd696..c347845 100644
--- oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
+++ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
@@ -47,6 +47,14 @@ public interface CopyOnReadStatsMBean {
 
     long getDownloadTime();
 
+    int getDownloadCount();
+
+    String getUploadSize();
+
+    long getUploadTime();
+
+    int getUploadCount();
+
     String getLocalIndexSize();
 
     String[] getGarbageDetails();
diff --git oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
index 52e6202..63342df 100644
--- oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
+++ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
@@ -20,18 +20,25 @@
 package org.apache.jackrabbit.oak.plugins.index.lucene;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.management.openmbean.CompositeDataSupport;
 import javax.management.openmbean.CompositeType;
@@ -45,13 +52,13 @@ import javax.management.openmbean.TabularType;
 import com.google.common.base.Charsets;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
-
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.commons.IOUtils;
-import org.apache.lucene.store.BaseDirectory;
+import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
+import org.apache.jackrabbit.oak.util.PerfLogger;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
 import org.apache.lucene.store.FilterDirectory;
@@ -66,13 +73,17 @@ import static com.google.common.collect.Iterables.toArray;
 import static com.google.common.collect.Iterables.transform;
 import static com.google.common.collect.Maps.newConcurrentMap;
 
-class IndexCopier implements CopyOnReadStatsMBean {
+public class IndexCopier implements CopyOnReadStatsMBean {
     private static final Set<String> REMOTE_ONLY = ImmutableSet.of("segments.gen");
     private static final int MAX_FAILURE_ENTRIES = 10000;
+    private static final AtomicInteger UNIQUE_COUNTER = new AtomicInteger();
+    private static final String WORK_DIR_NAME = "indexWriterDir";
 
     private final Logger log = LoggerFactory.getLogger(getClass());
+    private final PerfLogger PERF_LOGGER = new PerfLogger(LoggerFactory.getLogger(log.getName() + ".perf"));
     private final Executor executor;
     private final File indexRootDir;
+    private final File indexWorkDir;
 
     private final AtomicInteger localReadCount = new AtomicInteger();
     private final AtomicInteger remoteReadCount = new AtomicInteger();
@@ -82,35 +93,66 @@ class IndexCopier implements CopyOnReadStatsMBean {
     private final AtomicInteger copyInProgressCount = new AtomicInteger();
     private final AtomicInteger maxCopyInProgressCount = new AtomicInteger();
     private final AtomicInteger maxScheduledForCopyCount = new AtomicInteger();
+    private final AtomicInteger uploadCount = new AtomicInteger();
+    private final AtomicInteger downloadCount = new AtomicInteger();
     private final AtomicLong copyInProgressSize = new AtomicLong();
     private final AtomicLong downloadSize = new AtomicLong();
+    private final AtomicLong uploadSize = new AtomicLong();
     private final AtomicLong garbageCollectedSize = new AtomicLong();
     private final AtomicLong downloadTime = new AtomicLong();
+    private final AtomicLong uploadTime = new AtomicLong();
 
 
-    private final Map<String, String> indexPathMapping = Maps.newConcurrentMap();
-    private final Map<String, String> indexPathVersionMapping = Maps.newConcurrentMap();
-    private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = Maps.newConcurrentMap();
+    private final Map<String, String> indexPathMapping = newConcurrentMap();
+    private final Map<String, String> indexPathVersionMapping = newConcurrentMap();
+    private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = newConcurrentMap();
     private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
 
-    public IndexCopier(Executor executor, File indexRootDir) {
+    public IndexCopier(Executor executor, File indexRootDir) throws IOException {
         this.executor = executor;
         this.indexRootDir = indexRootDir;
+        this.indexWorkDir = initializerWorkDir(indexRootDir);
     }
 
-    public Directory wrap(String indexPath, IndexDefinition definition, Directory remote) throws IOException {
-        Directory local = createLocalDir(indexPath, definition);
+    public Directory wrapForRead(String indexPath, IndexDefinition definition, Directory remote) throws IOException {
+        Directory local = createLocalDirForIndexReader(indexPath, definition);
         return new CopyOnReadDirectory(remote, local);
     }
 
-    protected Directory createLocalDir(String indexPath, IndexDefinition definition) throws IOException {
+    public Directory wrapForWrite(IndexDefinition definition, Directory remote ) throws IOException {
+        Directory local = createLocalDirForIndexWriter(definition);
+        return new CopyOnWriteDirectory(remote, local);
+    }
+
+    protected Directory createLocalDirForIndexWriter(IndexDefinition definition) throws IOException {
+        String indexPath = definition.getIndexPathFromConfig();
+        File indexWriterDir;
+        if (indexPath == null){
+            //If indexPath is not known create a unique directory for work
+            indexWriterDir = new File(indexWorkDir, String.valueOf(UNIQUE_COUNTER.incrementAndGet()));
+        } else {
+            File indexDir = getIndexDir(indexPath);
+            String newVersion = String.valueOf(definition.getReindexCount());
+            indexWriterDir = getVersionedDir(indexPath, indexDir, newVersion);
+        }
+        Directory dir = FSDirectory.open(indexWriterDir);
+
+        log.debug("IndexWriter would use {}", indexWriterDir);
+
+        if (indexPath == null) {
+            dir = new DeleteOldDirOnClose(dir, indexWriterDir);
+            log.debug("IndexPath [{}] not configured in index definition {}. Writer would create index " +
+                    "files in temporary dir {} which would be deleted upon close. For better performance do " +
+                    "configure the indexPath as part of your index definition", LuceneIndexConstants.INDEX_PATH,
+                    definition, indexWriterDir);
+        }
+        return dir;
+    }
+
+    protected Directory createLocalDirForIndexReader(String indexPath, IndexDefinition definition) throws IOException {
         File indexDir = getIndexDir(indexPath);
         String newVersion = String.valueOf(definition.getReindexCount());
-        File versionedIndexDir = new File(indexDir, newVersion);
-        if (!versionedIndexDir.exists()) {
-            checkState(versionedIndexDir.mkdirs(), "Cannot create directory %s", versionedIndexDir);
-        }
-        indexPathMapping.put(indexPath, indexDir.getAbsolutePath());
+        File versionedIndexDir = getVersionedDir(indexPath, indexDir, newVersion);
         Directory result = FSDirectory.open(versionedIndexDir);
 
         String oldVersion = indexPathVersionMapping.put(indexPath, newVersion);
@@ -120,6 +162,15 @@ class IndexCopier implements CopyOnReadStatsMBean {
         return result;
     }
 
+    private File getVersionedDir(String indexPath, File indexDir, String newVersion) {
+        File versionedIndexDir = new File(indexDir, newVersion);
+        if (!versionedIndexDir.exists()) {
+            checkState(versionedIndexDir.mkdirs(), "Cannot create directory %s", versionedIndexDir);
+        }
+        indexPathMapping.put(indexPath, indexDir.getAbsolutePath());
+        return versionedIndexDir;
+    }
+
     public File getIndexDir(String indexPath) {
         String subDir = Hashing.sha256().hashString(indexPath, Charsets.UTF_8).toString();
         return new File(indexRootDir, subDir);
@@ -156,57 +207,51 @@ class IndexCopier implements CopyOnReadStatsMBean {
     }
 
     /**
+     * Creates the workDir. If it exists then it is cleaned
+     *
+     * @param indexRootDir root directory under which all indexing related files are managed
+     * @return work directory. Always empty
+     */
+    private static File initializerWorkDir(File indexRootDir) throws IOException {
+        File workDir = new File(indexRootDir, WORK_DIR_NAME);
+        FileUtils.deleteDirectory(workDir);
+        checkState(workDir.mkdirs(), "Cannot create directory %s", workDir);
+        return workDir;
+    }
+
+    /**
      * Directory implementation which lazily copies the index files from a
      * remote directory in background.
      */
-    private class CopyOnReadDirectory extends BaseDirectory {
+    private class CopyOnReadDirectory extends FilterDirectory {
         private final Directory remote;
         private final Directory local;
 
-        private final ConcurrentMap<String, FileReference> files = newConcurrentMap();
+        private final ConcurrentMap<String, CORFileReference> files = newConcurrentMap();
 
         public CopyOnReadDirectory(Directory remote, Directory local) throws IOException {
+            super(remote);
             this.remote = remote;
             this.local = local;
         }
 
         @Override
-        public String[] listAll() throws IOException {
-            return remote.listAll();
-        }
-
-        @Override
-        public boolean fileExists(String name) throws IOException {
-            return remote.fileExists(name);
-        }
-
-        @Override
         public void deleteFile(String name) throws IOException {
             throw new UnsupportedOperationException("Cannot delete in a ReadOnly directory");
         }
 
         @Override
-        public long fileLength(String name) throws IOException {
-            return remote.fileLength(name);
-        }
-
-        @Override
         public IndexOutput createOutput(String name, IOContext context) throws IOException {
             throw new UnsupportedOperationException("Cannot write in a ReadOnly directory");
         }
 
         @Override
-        public void sync(Collection<String> names) throws IOException {
-            remote.sync(names);
-        }
-
-        @Override
         public IndexInput openInput(String name, IOContext context) throws IOException {
             if (REMOTE_ONLY.contains(name)) {
                 return remote.openInput(name, context);
             }
 
-            FileReference ref = files.get(name);
+            CORFileReference ref = files.get(name);
             if (ref != null) {
                 if (ref.isLocalValid()) {
                     return files.get(name).openLocalInput(context);
@@ -216,8 +261,8 @@ class IndexCopier implements CopyOnReadStatsMBean {
                 }
             }
 
-            FileReference toPut = new FileReference(name);
-            FileReference old = files.putIfAbsent(name, toPut);
+            CORFileReference toPut = new CORFileReference(name);
+            CORFileReference old = files.putIfAbsent(name, toPut);
             if (old == null) {
                 copy(toPut);
             }
@@ -230,7 +275,7 @@ class IndexCopier implements CopyOnReadStatsMBean {
             return remote.openInput(name, context);
         }
 
-        private void copy(final FileReference reference) {
+        private void copy(final CORFileReference reference) {
             updateMaxScheduled(scheduledForCopyCount.incrementAndGet());
             executor.execute(new Runnable() {
                 @Override
@@ -242,7 +287,7 @@ class IndexCopier implements CopyOnReadStatsMBean {
                         scheduledForCopyCount.decrementAndGet();
                         if (!local.fileExists(name)) {
                             long fileSize = remote.fileLength(name);
-                            LocalIndexFile file = new LocalIndexFile(local, name, fileSize);
+                            LocalIndexFile file = new LocalIndexFile(local, name, fileSize, true);
                             long start = startCopy(file);
                             copyAttempted = true;
 
@@ -365,11 +410,11 @@ class IndexCopier implements CopyOnReadStatsMBean {
             }
         }
 
-        private class FileReference {
+        private class CORFileReference {
             final String name;
             private volatile boolean valid;
 
-            private FileReference(String name) {
+            private CORFileReference(String name) {
                 this.name = name;
             }
 
@@ -388,6 +433,397 @@ class IndexCopier implements CopyOnReadStatsMBean {
         }
     }
 
+    private class CopyOnWriteDirectory extends FilterDirectory {
+        /**
+         * Signal for the background thread to stop processing changes.
+         */
+        private final Callable<Void> STOP = new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                return null;
+            }
+        };
+        private final Directory remote;
+        private final Directory local;
+        private int remoteReadCount;
+        private int localReadCount;
+        private final ConcurrentMap<String, COWFileReference> fileMap = newConcurrentMap();
+        private final Set<String> deletedFilesLocal = Sets.newConcurrentHashSet();
+        private final Set<String> skippedFiles = Sets.newConcurrentHashSet();
+
+        private final BlockingQueue<Callable<Void>> queue = new LinkedBlockingQueue<Callable<Void>>();
+        private final AtomicReference<Throwable> errorInCopy = new AtomicReference<Throwable>();
+        private final CountDownLatch copyDone = new CountDownLatch(1);
+
+        /**
+         * Current background task
+         */
+        private volatile NotifyingFutureTask currentTask =  NotifyingFutureTask.completed();
+
+        /**
+         * Completion handler: set the current task to the next task and schedules that one
+         * on the background thread.
+         */
+        private final Runnable completionHandler = new Runnable() {
+            Callable<Void> task = new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    try {
+                        Callable<Void> task = queue.poll();
+                        if (task != null && task != STOP) {
+                            if (errorInCopy.get() != null) {
+                                log.trace("Skipping task {} as some exception occurred in previous run", task);
+                            } else {
+                                task.call();
+                            }
+                            currentTask.onComplete(completionHandler);
+                        }
+
+                        //Signal that all tasks completed
+                        if (task == STOP){
+                            copyDone.countDown();
+                        }
+                    } catch (Throwable t) {
+                        errorInCopy.set(t);
+                        log.debug("Error occurred while copying files. Further processing would be skipped", t);
+                        currentTask.onComplete(completionHandler);
+                    }
+                    return null;
+                }
+            };
+
+            @Override
+            public void run() {
+                currentTask = new NotifyingFutureTask(task);
+                executor.execute(currentTask);
+            }
+        };
+
+        public CopyOnWriteDirectory(Directory remote, Directory local) throws IOException {
+            super(local);
+            this.remote = remote;
+            this.local = local;
+            initialize();
+        }
+
+        @Override
+        public String[] listAll() throws IOException {
+            return Iterables.toArray(fileMap.keySet(), String.class);
+        }
+
+        @Override
+        public boolean fileExists(String name) throws IOException {
+            return fileMap.containsKey(name);
+        }
+
+        @Override
+        public void deleteFile(String name) throws IOException {
+            log.trace("[COW] Deleted file {}", name);
+            COWFileReference ref = fileMap.remove(name);
+            if (ref != null) {
+                ref.delete();
+            }
+        }
+
+        @Override
+        public long fileLength(String name) throws IOException {
+            COWFileReference ref = fileMap.get(name);
+            if (ref == null) {
+                throw new FileNotFoundException(name);
+            }
+            return ref.fileLength();
+        }
+
+        @Override
+        public IndexOutput createOutput(String name, IOContext context) throws IOException {
+            COWFileReference ref = fileMap.remove(name);
+            if (ref != null) {
+                ref.delete();
+            }
+            ref = new COWLocalFileReference(name);
+            fileMap.put(name, ref);
+            return ref.createOutput(context);
+        }
+
+        @Override
+        public void sync(Collection<String> names) throws IOException {
+            for (String name : names){
+                COWFileReference file = fileMap.get(name);
+                if (file != null){
+                    file.sync();
+                }
+            }
+        }
+
+        @Override
+        public IndexInput openInput(String name, IOContext context) throws IOException {
+            COWFileReference ref = fileMap.get(name);
+            if (ref == null) {
+                throw new FileNotFoundException(name);
+            }
+            return ref.openInput(context);
+        }
+
+        @Override
+        public void close() throws IOException {
+            int pendingCopies = queue.size();
+            addTask(STOP);
+
+            //Wait for all pending copy task to finish
+            try {
+                long start = PERF_LOGGER.start();
+                copyDone.await();
+                PERF_LOGGER.end(start, -1, "Completed pending copying task {}", pendingCopies);
+            } catch (InterruptedException e) {
+                throw new IOException(e);
+            }
+
+            Throwable t = errorInCopy.get();
+            if (t != null){
+                throw new IOException("Error occurred while copying files", t);
+            }
+
+            for (String fileName : deletedFilesLocal){
+                deleteLocalFile(fileName);
+            }
+
+            log.debug("Local read count {}, remote read count {}, Skipped copying {}", localReadCount,
+                    remoteReadCount, skippedFiles.size());
+
+            if (log.isTraceEnabled()){
+                log.trace("File listing - Upon completion" + Arrays.toString(remote.listAll()));
+            }
+
+            local.close();
+            remote.close();
+        }
+
+        private void deleteLocalFile(String file) {
+            try {
+                local.deleteFile(file);
+            } catch (IOException e) {
+                log.debug("Error occurred while deleting file {} from local {}", file, local, e);
+            }
+        }
+
+        private void initialize() throws IOException {
+            for (String name : remote.listAll()) {
+                fileMap.put(name, new COWRemoteFileReference(name));
+            }
+
+            if (log.isTraceEnabled()){
+                log.trace("File listing - Start" + Arrays.toString(remote.listAll()));
+            }
+        }
+
+        private void addCopyTask(final String name){
+            addTask(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    if (deletedFilesLocal.contains(name)){
+                        skippedFiles.add(name);
+                        log.trace("[COW] Skip copying of deleted file {}", name);
+                        return null;
+                    }
+                    long fileSize = local.fileLength(name);
+                    LocalIndexFile file = new LocalIndexFile(local, name, fileSize, false);
+                    long perfStart = PERF_LOGGER.start();
+                    long start = startCopy(file);
+
+                    local.copy(remote, name, name, IOContext.DEFAULT);
+
+                    doneCopy(file, start);
+                    PERF_LOGGER.end(perfStart, 0, "Copied to remote {} ", name);
+                    return null;
+                }
+
+                @Override
+                public String toString() {
+                    return "Copy: " + name;
+                }
+            });
+        }
+
+        private void addDeleteTask(final String name){
+            addTask(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    if (!skippedFiles.contains(name)) {
+                        log.trace("[COW] Marking as deleted {}", name);
+                        remote.deleteFile(name);
+                    } else {
+
+                    }
+                    return null;
+                }
+
+                @Override
+                public String toString() {
+                    return "Delete : " + name;
+                }
+            });
+        }
+
+        private void addTask(Callable<Void> task){
+            queue.add(task);
+            currentTask.onComplete(completionHandler);
+        }
+
+        private abstract class COWFileReference {
+            protected final String name;
+
+            public COWFileReference(String name) {
+                this.name = name;
+            }
+
+            public abstract long fileLength() throws IOException;
+
+            public abstract IndexInput openInput(IOContext context) throws IOException;
+
+            public abstract IndexOutput createOutput(IOContext context) throws IOException;
+
+            public abstract void delete() throws IOException;
+
+            public void sync() throws IOException {
+
+            }
+        }
+
+        private class COWRemoteFileReference extends COWFileReference {
+            private boolean validLocalCopyPresent;
+            private final long length;
+
+            public COWRemoteFileReference(String name) throws IOException {
+                super(name);
+                this.length = remote.fileLength(name);
+            }
+
+            @Override
+            public long fileLength() throws IOException {
+                return length;
+            }
+
+            @Override
+            public IndexInput openInput(IOContext context) throws IOException {
+                checkIfLocalValid();
+                if (validLocalCopyPresent && !REMOTE_ONLY.contains(name)) {
+                    localReadCount++;
+                    return local.openInput(name, context);
+                }
+                remoteReadCount++;
+                return remote.openInput(name, context);
+            }
+
+            @Override
+            public IndexOutput createOutput(IOContext context) throws IOException {
+                throw new UnsupportedOperationException("Cannot create output for existing remote file " + name);
+            }
+
+            @Override
+            public void delete() throws IOException {
+                //Remote file should not be deleted locally as it might be
+                //in use by existing opened IndexSearcher. It would anyway
+                //get deleted by CopyOnRead later
+                //For now just record that these need to be deleted to avoid
+                //potential concurrent access of the NodeBuilder
+                addDeleteTask(name);
+            }
+
+            private void checkIfLocalValid() throws IOException {
+                validLocalCopyPresent = local.fileExists(name)
+                        && local.fileLength(name) == remote.fileLength(name);
+            }
+        }
+
+        private class COWLocalFileReference extends COWFileReference {
+            public COWLocalFileReference(String name) {
+                super(name);
+            }
+
+            @Override
+            public long fileLength() throws IOException {
+                return local.fileLength(name);
+            }
+
+            @Override
+            public IndexInput openInput(IOContext context) throws IOException {
+                return local.openInput(name, context);
+            }
+
+            @Override
+            public IndexOutput createOutput(IOContext context) throws IOException {
+                log.debug("[COW] Creating output {}", name);
+                return new CopyOnCloseIndexOutput(local.createOutput(name, context));
+            }
+
+            @Override
+            public void delete() throws IOException {
+                addDeleteTask(name);
+                deletedFilesLocal.add(name);
+            }
+
+            @Override
+            public void sync() throws IOException {
+                local.sync(Collections.singleton(name));
+            }
+
+            /**
+             * Implementation note - As we are decorating existing implementation
+             * we would need to ensure that we also override methods (non abstract)
+             * which might be implemented in say FSIndexInput like setLength
+             */
+            private class CopyOnCloseIndexOutput extends IndexOutput {
+                private final IndexOutput delegate;
+
+                public CopyOnCloseIndexOutput(IndexOutput delegate) {
+                    this.delegate = delegate;
+                }
+
+                @Override
+                public void flush() throws IOException {
+                    delegate.flush();
+                }
+
+                @Override
+                public void close() throws IOException {
+                    delegate.close();
+                    //Schedule this file to be copied in background
+                    addCopyTask(name);
+                }
+
+                @Override
+                public long getFilePointer() {
+                    return delegate.getFilePointer();
+                }
+
+                @Override
+                public void seek(long pos) throws IOException {
+                    delegate.seek(pos);
+                }
+
+                @Override
+                public long length() throws IOException {
+                    return delegate.length();
+                }
+
+                @Override
+                public void writeByte(byte b) throws IOException {
+                    delegate.writeByte(b);
+                }
+
+                @Override
+                public void writeBytes(byte[] b, int offset, int length) throws IOException {
+                    delegate.writeBytes(b, offset, length);
+                }
+
+                @Override
+                public void setLength(long length) throws IOException {
+                    delegate.setLength(length);
+                }
+            }
+        }
+    }
+
     private long startCopy(LocalIndexFile file) {
         updateMaxInProgress(copyInProgressCount.incrementAndGet());
         copyInProgressSize.addAndGet(file.size);
@@ -400,8 +836,16 @@ class IndexCopier implements CopyOnReadStatsMBean {
         copyInProgressCount.decrementAndGet();
         copyInProgressSize.addAndGet(-file.size);
 
-        downloadTime.addAndGet(System.currentTimeMillis() - start);
-        downloadSize.addAndGet(file.size);
+        if(file.copyFromRemote) {
+            downloadTime.addAndGet(System.currentTimeMillis() - start);
+            downloadSize.addAndGet(file.size);
+            downloadCount.incrementAndGet();
+        } else {
+            uploadSize.addAndGet(file.size);
+            uploadTime.addAndGet(System.currentTimeMillis() - start);
+            uploadCount.incrementAndGet();
+        }
+
     }
 
     private void updateMaxScheduled(int val) {
@@ -432,13 +876,18 @@ class IndexCopier implements CopyOnReadStatsMBean {
 
         @Override
         public void close() throws IOException {
-            try{
-                FileUtils.deleteDirectory(oldIndexDir);
-                log.debug("Removed old index content from {} ", oldIndexDir);
-            } catch (IOException e){
-                log.warn("Not able to remove old version of copied index at {}", oldIndexDir, e);
+            try {
+                super.close();
+            } finally {
+                //Clean out the local dir irrespective of any error occuring upon
+                //close in wrapped directory
+                try{
+                    FileUtils.deleteDirectory(oldIndexDir);
+                    log.debug("Removed old index content from {} ", oldIndexDir);
+                } catch (IOException e){
+                    log.warn("Not able to remove old version of copied index at {}", oldIndexDir, e);
+                }
             }
-            super.close();
         }
     }
     
@@ -446,17 +895,20 @@ class IndexCopier implements CopyOnReadStatsMBean {
         final File dir;
         final String name;
         final long size;
+        final boolean copyFromRemote;
         private volatile int deleteAttemptCount;
         final long creationTime = System.currentTimeMillis();
         
-        public LocalIndexFile(Directory dir, String fileName, long size){
+        public LocalIndexFile(Directory dir, String fileName,
+                              long size, boolean copyFromRemote){
+            this.copyFromRemote = copyFromRemote;
             this.dir = getFSDir(dir);
             this.name = fileName;
             this.size = size;
         }
 
         public LocalIndexFile(Directory dir, String fileName){
-            this(dir, fileName, getFileLength(dir, fileName));
+            this(dir, fileName, getFileLength(dir, fileName), true);
         }
 
         public String getKey(){
@@ -589,6 +1041,26 @@ class IndexCopier implements CopyOnReadStatsMBean {
     }
 
     @Override
+    public int getDownloadCount() {
+        return downloadCount.get();
+    }
+
+    @Override
+    public int getUploadCount() {
+        return uploadCount.get();
+    }
+
+    @Override
+    public String getUploadSize() {
+        return IOUtils.humanReadableByteCount(uploadSize.get());
+    }
+
+    @Override
+    public long getUploadTime() {
+        return uploadTime.get();
+    }
+
+    @Override
     public String getLocalIndexSize() {
         return IOUtils.humanReadableByteCount(FileUtils.sizeOfDirectory(indexRootDir));
     }
diff --git oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java
index ffd2d48..679dfeb 100644
--- oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java
+++ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexDefinition.java
@@ -200,6 +200,8 @@ class IndexDefinition implements Aggregate.AggregateMapper{
 
     private final int suggesterUpdateFrequencyMinutes;
 
+    private final long reindexCount;
+
     private final PathFilter pathFilter;
 
     @Nullable
@@ -267,6 +269,7 @@ class IndexDefinition implements Aggregate.AggregateMapper{
         this.maxExtractLength = determineMaxExtractLength();
         this.suggesterUpdateFrequencyMinutes = getOptionalValue(defn, LuceneIndexConstants.SUGGEST_UPDATE_FREQUENCY_MINUTES, 60);
         this.scorerProviderName = getOptionalValue(defn, LuceneIndexConstants.PROP_SCORER_PROVIDER, null);
+        this.reindexCount = determineReindexCount(defn, defnb);
         this.pathFilter = PathFilter.from(new ReadOnlyBuilder(defn));
         this.queryPaths = getQueryPaths(defn);
         this.saveDirListing = getOptionalValue(defn, LuceneIndexConstants.SAVE_DIR_LISTING, false);
@@ -297,10 +300,7 @@ class IndexDefinition implements Aggregate.AggregateMapper{
     }
 
     public long getReindexCount(){
-        if(definition.hasProperty(REINDEX_COUNT)){
-            return definition.getProperty(REINDEX_COUNT).getValue(Type.LONG);
-        }
-        return 0;
+        return reindexCount;
     }
 
     public long getEntryCount() {
@@ -609,6 +609,11 @@ class IndexDefinition implements Aggregate.AggregateMapper{
         return suggestEnabled;
     }
 
+    @CheckForNull
+    public String getIndexPathFromConfig() {
+        return definition.getString(LuceneIndexConstants.INDEX_PATH);
+    }
+
     public class IndexingRule {
         private final String baseNodeType;
         private final String nodeTypeName;
@@ -1149,6 +1154,9 @@ class IndexDefinition implements Aggregate.AggregateMapper{
     private static String determineIndexName(NodeState defn, String indexPath) {
         String indexName = defn.getString(PROP_NAME);
         if (indexName ==  null){
+            if (indexPath == null){
+                indexPath = defn.getString(LuceneIndexConstants.INDEX_PATH);
+            }
             if (indexPath != null) {
                 return indexPath;
             }
@@ -1345,4 +1353,16 @@ class IndexDefinition implements Aggregate.AggregateMapper{
         return defn.getChildNode(LuceneIndexConstants.INDEX_RULES).exists();
     }
 
+    private static long determineReindexCount(NodeState defn, NodeBuilder defnb) {
+        //Give precedence to count from builder as that reflects the latest state
+        //and might be higher than one from nodeState which is the base state
+        if (defnb != null && defnb.hasProperty(REINDEX_COUNT)) {
+            return defnb.getProperty(REINDEX_COUNT).getValue(Type.LONG);
+        }
+        if (defn.hasProperty(REINDEX_COUNT)) {
+            return defn.getProperty(REINDEX_COUNT).getValue(Type.LONG);
+        }
+        return 0;
+    }
+
 }
diff --git oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
index 89bc068..e0a3d62 100644
--- oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
+++ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
@@ -48,7 +48,7 @@ class IndexNode {
         if (data.exists()) {
             directory = new OakDirectory(new ReadOnlyBuilder(data), definition, true);
             if (cloner != null){
-                directory = cloner.wrap(indexPath, definition, directory);
+                directory = cloner.wrapForRead(indexPath, definition, directory);
             }
         } else if (PERSISTENCE_FILE.equalsIgnoreCase(defnNodeState.getString(PERSISTENCE_NAME))) {
             String path = defnNodeState.getString(PERSISTENCE_PATH);
diff --git oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java
index 10a75d2..b60431d 100644
--- oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java
+++ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexConstants.java
@@ -262,4 +262,12 @@ public interface LuceneIndexConstants {
      * to allow faster reads (OAK-2809)
      */
     String SAVE_DIR_LISTING = "saveDirectoryListing";
+
+    /**
+     * Optional  Property to store the path of index in the repository. Path at which index
+     * definition is defined is not known to IndexEditor. To make use of CopyOnWrite
+     * feature its required to know the indexPath to optimize the lookup and read of
+     * existing index files
+     */
+    String INDEX_PATH = "indexPath";
 }
diff --git oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java
index 4669c2b..ef78876 100644
--- oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java
+++ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java
@@ -35,6 +35,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.annotation.Nullable;
+
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
@@ -116,12 +118,12 @@ public class LuceneIndexEditor implements IndexEditor, Aggregate.AggregateRoot {
     private final PathFilter.Result pathFilterResult;
 
     LuceneIndexEditor(NodeState root, NodeBuilder definition,
-        IndexUpdateCallback updateCallback) throws CommitFailedException {
+        IndexUpdateCallback updateCallback,@Nullable IndexCopier indexCopier) throws CommitFailedException {
         this.parent = null;
         this.name = null;
         this.path = "/";
         this.context = new LuceneIndexEditorContext(root, definition,
-                updateCallback);
+                updateCallback, indexCopier);
         this.root = root;
         this.isDeleted = false;
         this.matcherState = MatcherState.NONE;
diff --git oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java
index 4503e54..d276ebe 100644
--- oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java
+++ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java
@@ -28,6 +28,8 @@ import java.net.URL;
 import java.util.Calendar;
 import java.util.Set;
 
+import javax.annotation.Nullable;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
@@ -59,14 +61,16 @@ public class LuceneIndexEditorContext {
     private static final PerfLogger PERF_LOGGER =
             new PerfLogger(LoggerFactory.getLogger(LuceneIndexEditorContext.class.getName() + ".perf"));
 
-    static IndexWriterConfig getIndexWriterConfig(IndexDefinition definition) {
+    static IndexWriterConfig getIndexWriterConfig(IndexDefinition definition, boolean remoteDir) {
         // FIXME: Hack needed to make Lucene work in an OSGi environment
         Thread thread = Thread.currentThread();
         ClassLoader loader = thread.getContextClassLoader();
         thread.setContextClassLoader(IndexWriterConfig.class.getClassLoader());
         try {
             IndexWriterConfig config = new IndexWriterConfig(VERSION, definition.getAnalyzer());
-            config.setMergeScheduler(new SerialMergeScheduler());
+            if (remoteDir) {
+                config.setMergeScheduler(new SerialMergeScheduler());
+            }
             if (definition.getCodec() != null) {
                 config.setCodec(definition.getCodec());
             }
@@ -100,8 +104,6 @@ public class LuceneIndexEditorContext {
         }
     }
 
-    private final IndexWriterConfig config;
-
     private static final Parser defaultParser = createDefaultParser();
 
     private final IndexDefinition definition;
@@ -118,6 +120,10 @@ public class LuceneIndexEditorContext {
 
     private Parser parser;
 
+    @Nullable
+    private final IndexCopier indexCopier;
+
+
     private Directory directory;
 
     /**
@@ -125,10 +131,11 @@ public class LuceneIndexEditorContext {
      */
     private Set<MediaType> supportedMediaTypes;
 
-    LuceneIndexEditorContext(NodeState root, NodeBuilder definition, IndexUpdateCallback updateCallback) {
+    LuceneIndexEditorContext(NodeState root, NodeBuilder definition, IndexUpdateCallback updateCallback,
+                             @Nullable IndexCopier indexCopier) {
         this.definitionBuilder = definition;
+        this.indexCopier = indexCopier;
         this.definition = new IndexDefinition(root, definition);
-        this.config = getIndexWriterConfig(this.definition);
         this.indexedNodes = 0;
         this.updateCallback = updateCallback;
         if (this.definition.isOfOldFormat()){
@@ -147,6 +154,13 @@ public class LuceneIndexEditorContext {
         if (writer == null) {
             final long start = PERF_LOGGER.start();
             directory = newIndexDirectory(definition, definitionBuilder);
+            IndexWriterConfig config;
+            if (indexCopier != null){
+                directory = indexCopier.wrapForWrite(definition, directory);
+                config = getIndexWriterConfig(definition, false);
+            } else {
+                config = getIndexWriterConfig(definition, true);
+            }
             writer = new IndexWriter(directory, config);
             PERF_LOGGER.end(start, -1, "Created IndexWriter for directory {}", definition);
         }
diff --git oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
index 513538d..e115d9d 100644
--- oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
+++ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
@@ -17,11 +17,10 @@
 package org.apache.jackrabbit.oak.plugins.index.lucene;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE;
 
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditor;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
@@ -37,19 +36,29 @@ import org.apache.jackrabbit.oak.spi.state.NodeState;
  * @see IndexEditorProvider
  * 
  */
-@Component
-@Service(IndexEditorProvider.class)
 public class LuceneIndexEditorProvider implements IndexEditorProvider {
+    private final IndexCopier indexCopier;
+
+    public LuceneIndexEditorProvider() {
+        this(null);
+    }
+
+    public LuceneIndexEditorProvider(@Nullable IndexCopier indexCopier) {
+        this.indexCopier = indexCopier;
+    }
 
     @Override
     public Editor getIndexEditor(
-            @Nonnull String type, @Nonnull NodeBuilder definition, @Nonnull NodeState root, @Nonnull IndexUpdateCallback callback)
+            @Nonnull String type, @Nonnull NodeBuilder definition, @Nonnull NodeState root,
+            @Nonnull IndexUpdateCallback callback)
             throws CommitFailedException {
         if (TYPE_LUCENE.equals(type)) {
-            return new LuceneIndexEditor(root, definition, callback);
+            return new LuceneIndexEditor(root, definition, callback, indexCopier);
         }
         return null;
     }
 
-
+    IndexCopier getIndexCopier() {
+        return indexCopier;
+    }
 }
diff --git oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
index 116125c..aa9c385 100644
--- oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
+++ oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
@@ -20,9 +20,17 @@
 package org.apache.jackrabbit.oak.plugins.index.lucene;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nonnull;
 import javax.management.NotCompliantMBeanException;
 
 import com.google.common.base.Strings;
@@ -38,6 +46,7 @@ import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.apache.felix.scr.annotations.ReferencePolicyOption;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.aggregate.NodeAggregator;
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
 import org.apache.jackrabbit.oak.plugins.index.lucene.score.ScorerProviderFactory;
@@ -99,6 +108,13 @@ public class LuceneIndexProviderService {
     private static final String PROP_LOCAL_INDEX_DIR = "localIndexDir";
 
     @Property(
+            boolValue = false,
+            label = "Enable CopyOnWrite",
+            description = "Enable copying of Lucene index to local file system to improve index writer performance"
+    )
+    private static final String PROP_COPY_ON_WRITE = "enableCopyOnWriteSupport";
+
+    @Property(
             boolValue = true,
             label = "Open index asynchronously",
             description = "Enable opening of indexes in asynchronous mode"
@@ -107,20 +123,22 @@ public class LuceneIndexProviderService {
 
     private Whiteboard whiteboard;
 
-    private WhiteboardExecutor executor;
-
     private BackgroundObserver backgroundObserver;
 
     @Reference
     ScorerProviderFactory scorerFactory;
 
+    private IndexCopier indexCopier;
+
+    private File indexDir;
+
+    private ExecutorService executorService;
+
     @Activate
     private void activate(BundleContext bundleContext, Map<String, ?> config)
-            throws NotCompliantMBeanException {
+            throws NotCompliantMBeanException, IOException {
         initializeFactoryClassLoaders(getClass().getClassLoader());
         whiteboard = new OsgiWhiteboard(bundleContext);
-        executor = new WhiteboardExecutor();
-        executor.start(whiteboard);
 
         indexProvider = new LuceneIndexProvider(createTracker(bundleContext, config), scorerFactory);
         initializeLogging(config);
@@ -128,6 +146,7 @@ public class LuceneIndexProviderService {
 
         regs.add(bundleContext.registerService(QueryIndexProvider.class.getName(), indexProvider, null));
         registerObserver(bundleContext, config);
+        registerIndexEditor(bundleContext, config);
 
         oakRegs.add(registerMBean(whiteboard,
                 LuceneIndexMBean.class,
@@ -137,7 +156,7 @@ public class LuceneIndexProviderService {
     }
 
     @Deactivate
-    private void deactivate() {
+    private void deactivate() throws InterruptedException {
         for (ServiceRegistration reg : regs) {
             reg.unregister();
         }
@@ -155,8 +174,9 @@ public class LuceneIndexProviderService {
             indexProvider = null;
         }
 
-        if (executor != null){
-            executor.stop();
+        if (executorService != null){
+            executorService.shutdown();
+            executorService.awaitTermination(1, TimeUnit.MINUTES);
         }
 
         InfoStream.setDefault(InfoStream.NO_OUTPUT);
@@ -183,41 +203,96 @@ public class LuceneIndexProviderService {
         }
     }
 
-    private IndexTracker createTracker(BundleContext bundleContext, Map<String, ?> config) {
+    private void registerIndexEditor(BundleContext bundleContext, Map<String, ?> config) throws IOException {
+        boolean enableCopyOnWrite = PropertiesUtil.toBoolean(config.get(PROP_COPY_ON_WRITE), false);
+        LuceneIndexEditorProvider editorProvider;
+        if (enableCopyOnWrite){
+            initializeIndexCopier(bundleContext, config);
+            editorProvider = new LuceneIndexEditorProvider(indexCopier);
+            log.info("Enabling CopyOnWrite support. Index files would be copied under {}", indexDir.getAbsolutePath());
+        } else {
+            editorProvider = new LuceneIndexEditorProvider();
+        }
+        regs.add(bundleContext.registerService(IndexEditorProvider.class.getName(), editorProvider, null));
+    }
+
+    private IndexTracker createTracker(BundleContext bundleContext, Map<String, ?> config) throws IOException {
         boolean enableCopyOnRead = PropertiesUtil.toBoolean(config.get(PROP_COPY_ON_READ), true);
         if (enableCopyOnRead){
-            String indexDirPath = PropertiesUtil.toString(config.get(PROP_LOCAL_INDEX_DIR), null);
-            if (Strings.isNullOrEmpty(indexDirPath)) {
-                String repoHome = bundleContext.getProperty(REPOSITORY_HOME);
-                if (repoHome != null){
-                    indexDirPath = FilenameUtils.concat(repoHome, "index");
-                }
+            initializeIndexCopier(bundleContext, config);
+            log.info("Enabling CopyOnRead support. Index files would be copied under {}", indexDir.getAbsolutePath());
+            return new IndexTracker(indexCopier);
+        }
+
+        return new IndexTracker();
+    }
+
+    private void initializeIndexCopier(BundleContext bundleContext, Map<String, ?> config) throws IOException {
+        if(indexCopier != null){
+            return;
+        }
+        String indexDirPath = PropertiesUtil.toString(config.get(PROP_LOCAL_INDEX_DIR), null);
+        if (Strings.isNullOrEmpty(indexDirPath)) {
+            String repoHome = bundleContext.getProperty(REPOSITORY_HOME);
+            if (repoHome != null){
+                indexDirPath = FilenameUtils.concat(repoHome, "index");
             }
+        }
 
-            checkNotNull(indexDirPath, "Index directory cannot be determined as neither index " +
-                    "directory path [%s] nor repository home [%s] defined", PROP_LOCAL_INDEX_DIR, REPOSITORY_HOME);
+        checkNotNull(indexDirPath, "Index directory cannot be determined as neither index " +
+                "directory path [%s] nor repository home [%s] defined", PROP_LOCAL_INDEX_DIR, REPOSITORY_HOME);
 
-            File indexDir = new File(indexDirPath);
-            IndexCopier copier = new IndexCopier(executor, indexDir);
-            log.info("Enabling CopyOnRead support. Index files would be copied under {}", indexDir.getAbsolutePath());
+        indexDir = new File(indexDirPath);
+        indexCopier = new IndexCopier(getExecutorService(), indexDir);
 
-            oakRegs.add(registerMBean(whiteboard,
-                    CopyOnReadStatsMBean.class,
-                    copier,
-                    CopyOnReadStatsMBean.TYPE,
-                    "CopyOnRead support statistics"));
+        oakRegs.add(registerMBean(whiteboard,
+                CopyOnReadStatsMBean.class,
+                indexCopier,
+                CopyOnReadStatsMBean.TYPE,
+                "CopyOnRead support statistics"));
 
-            return new IndexTracker(copier);
+    }
+
+    private ExecutorService getExecutorService(){
+        if (executorService == null){
+            executorService = createExecutor();
         }
+        return executorService;
+    }
 
-        return new IndexTracker();
+    private ExecutorService createExecutor() {
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(0, 5, 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+            private final AtomicInteger counter = new AtomicInteger();
+            private final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
+                @Override
+                public void uncaughtException(Thread t, Throwable e) {
+                    log.warn("Error occurred in asynchronous processing ", e);
+                }
+            };
+            @Override
+            public Thread newThread(@Nonnull Runnable r) {
+                Thread thread = new Thread(r, createName());
+                thread.setDaemon(true);
+                thread.setPriority(Thread.MIN_PRIORITY);
+                thread.setUncaughtExceptionHandler(handler);
+                return thread;
+            }
+
+            private String createName() {
+                return "oak-lucene-" + counter.getAndIncrement();
+            }
+        });
+        executor.setKeepAliveTime(1, TimeUnit.MINUTES);
+        executor.allowCoreThreadTimeOut(true);
+        return executor;
     }
 
     private void registerObserver(BundleContext bundleContext, Map<String, ?> config) {
         boolean enableAsyncIndexOpen = PropertiesUtil.toBoolean(config.get(PROP_ASYNC_INDEX_OPEN), true);
         Observer observer = indexProvider;
         if (enableAsyncIndexOpen) {
-            backgroundObserver = new BackgroundObserver(indexProvider, executor, 5);
+            backgroundObserver = new BackgroundObserver(indexProvider, getExecutorService(), 5);
             observer = backgroundObserver;
             oakRegs.add(registerMBean(whiteboard,
                     BackgroundObserverMBean.class,
diff --git oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
index 171f967..cf9a3c4 100644
--- oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
+++ oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
@@ -20,16 +20,21 @@
 package org.apache.jackrabbit.oak.plugins.index.lucene;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import javax.management.openmbean.TabularData;
 
@@ -53,6 +58,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
 import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_COUNT;
 import static org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent.INITIAL_CONTENT;
@@ -79,7 +85,7 @@ public class IndexCopierTest {
         IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), getWorkDir());
 
         Directory remote = new RAMDirectory();
-        Directory wrapped = c1.wrap("/foo" , defn, remote);
+        Directory wrapped = c1.wrapForRead("/foo", defn, remote);
 
         byte[] t1 = writeFile(remote , "t1");
         byte[] t2 = writeFile(remote , "t2");
@@ -104,7 +110,7 @@ public class IndexCopierTest {
         IndexCopier c1 = new IndexCopier(sameThreadExecutor(), getWorkDir());
 
         Directory remote = new RAMDirectory();
-        Directory wrapped = c1.wrap("/foo" , defn, remote);
+        Directory wrapped = c1.wrapForRead("/foo", defn, remote);
 
         byte[] t1 = writeFile(remote, "t1");
         byte[] t2 = writeFile(remote , "t2");
@@ -135,9 +141,9 @@ public class IndexCopierTest {
         IndexCopier c1 = new IndexCopier(sameThreadExecutor(), getWorkDir());
 
         Directory remote = new CloseSafeDir();
-        Directory w1 = c1.wrap("/foo" , defn, remote);
+        Directory w1 = c1.wrapForRead("/foo", defn, remote);
 
-        byte[] t1 = writeFile(remote , "t1");
+        byte[] t1 = writeFile(remote, "t1");
         byte[] t2 = writeFile(remote , "t2");
 
         readAndAssert(w1, "t1", t1);
@@ -154,7 +160,7 @@ public class IndexCopierTest {
         //Close old version
         w1.close();
         //Get a new one with updated reindexCount
-        Directory w2 = c1.wrap("/foo" , defn, remote);
+        Directory w2 = c1.wrapForRead("/foo", defn, remote);
 
         readAndAssert(w2, "t1", t1);
 
@@ -174,7 +180,7 @@ public class IndexCopierTest {
         IndexCopier c1 = new RAMIndexCopier(baseDir, executor, getWorkDir());
 
         TestRAMDirectory remote = new TestRAMDirectory();
-        Directory wrapped = c1.wrap("/foo", defn, remote);
+        Directory wrapped = c1.wrapForRead("/foo", defn, remote);
 
         byte[] t1 = writeFile(remote , "t1");
 
@@ -237,7 +243,7 @@ public class IndexCopierTest {
                 super.copy(to, src, dest, context);
             }
         };
-        Directory wrapped = c1.wrap("/foo", defn, remote);
+        Directory wrapped = c1.wrapForRead("/foo", defn, remote);
 
         byte[] t1 = writeFile(remote , "t1");
 
@@ -281,7 +287,7 @@ public class IndexCopierTest {
         IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), getWorkDir());
 
         TestRAMDirectory remote = new TestRAMDirectory();
-        Directory wrapped = c1.wrap("/foo" , defn, remote);
+        Directory wrapped = c1.wrapForRead("/foo", defn, remote);
 
         byte[] t1 = writeFile(remote, "t1");
 
@@ -290,7 +296,7 @@ public class IndexCopierTest {
         assertEquals(1, remote.openedFiles.size());
 
         //2. Reuse the testDir and read again
-        Directory wrapped2 = c1.wrap("/foo", defn, remote);
+        Directory wrapped2 = c1.wrapForRead("/foo", defn, remote);
         remote.reset();
 
         //3. Now read should be served from local
@@ -298,7 +304,7 @@ public class IndexCopierTest {
         assertEquals(0, remote.openedFiles.size());
 
         //Now check if local file gets corrupted then read from remote
-        Directory wrapped3 = c1.wrap("/foo" , defn, remote);
+        Directory wrapped3 = c1.wrapForRead("/foo", defn, remote);
         remote.reset();
 
         //4. Corrupt the local copy
@@ -323,7 +329,7 @@ public class IndexCopierTest {
         };
 
         String fileName = "failed.txt";
-        Directory wrapped = c1.wrap("/foo" , defn, remote);
+        Directory wrapped = c1.wrapForRead("/foo", defn, remote);
 
         byte[] t1 = writeFile(remote , fileName);
 
@@ -354,7 +360,7 @@ public class IndexCopierTest {
         byte[] t1 = writeFile(r1 , "t1");
         byte[] t2 = writeFile(r1 , "t2");
 
-        Directory w1 = c1.wrap("/foo" , defn, r1);
+        Directory w1 = c1.wrapForRead("/foo", defn, r1);
         readAndAssert(w1, "t1", t1);
         readAndAssert(w1, "t2", t2);
 
@@ -366,7 +372,7 @@ public class IndexCopierTest {
         copy(r1, r2);
         r2.deleteFile("t1");
 
-        Directory w2 = c1.wrap("/foo" , defn, r2);
+        Directory w2 = c1.wrapForRead("/foo", defn, r2);
 
         //Close would trigger removal of file which are not present in remote
         w2.close();
@@ -397,7 +403,7 @@ public class IndexCopierTest {
         byte[] t1 = writeFile(r1, "t1");
         byte[] t2 = writeFile(r1 , "t2");
 
-        Directory w1 = c1.wrap("/foo" , defn, r1);
+        Directory w1 = c1.wrapForRead("/foo", defn, r1);
         readAndAssert(w1, "t1", t1);
         readAndAssert(w1, "t2", t2);
 
@@ -409,7 +415,7 @@ public class IndexCopierTest {
         copy(r1, r2);
         r2.deleteFile("t1");
 
-        Directory w2 = c1.wrap("/foo" , defn, r2);
+        Directory w2 = c1.wrapForRead("/foo", defn, r2);
 
         //Close would trigger removal of file which are not present in remote
         testFiles.add("t1");
@@ -422,20 +428,282 @@ public class IndexCopierTest {
         assertEquals(IOUtils.humanReadableByteCount(t1.length), c1.getGarbageSize());
         assertEquals(1, c1.getGarbageDetails().length);
 
-        Directory w3 = c1.wrap("/foo" , defn, r2);
+        Directory w3 = c1.wrapForRead("/foo", defn, r2);
         w3.close();
         assertEquals(2, testFile.getDeleteAttemptCount());
 
         //Now let the file to be deleted
         testFiles.clear();
 
-        Directory w4 = c1.wrap("/foo" , defn, r2);
+        Directory w4 = c1.wrapForRead("/foo", defn, r2);
         w4.close();
 
         //No pending deletes left
         assertEquals(0, c1.getFailedToDeleteFiles().size());
     }
 
+    @Test
+    public void copyOnWriteBasics() throws Exception{
+        Directory baseDir = new CloseSafeDir();
+        IndexDefinition defn = new IndexDefinition(root, builder.getNodeState());
+        IndexCopier copier = new RAMIndexCopier(baseDir, sameThreadExecutor(), getWorkDir());
+
+        Directory remote = new RAMDirectory();
+        byte[] t1 = writeFile(remote, "t1");
+
+        //State of remote directory should set before wrapping as later
+        //additions would not be picked up given COW assume remote directory
+        //to be read only
+        Directory local = copier.wrapForWrite(defn, remote);
+
+        assertEquals(newHashSet("t1"), newHashSet(local.listAll()));
+        assertEquals(t1.length, local.fileLength("t1"));
+
+        byte[] t2 = writeFile(local, "t2");
+        assertEquals(newHashSet("t1", "t2"), newHashSet(local.listAll()));
+        assertEquals(t2.length, local.fileLength("t2"));
+
+        assertTrue(local.fileExists("t1"));
+        assertTrue(local.fileExists("t2"));
+
+        assertTrue("t2 should be copied to remote", remote.fileExists("t2"));
+
+        readAndAssert(local, "t1", t1);
+        readAndAssert(local, "t2", t2);
+
+        local.deleteFile("t1");
+        assertEquals(newHashSet("t2"), newHashSet(local.listAll()));
+
+        local.deleteFile("t2");
+        assertEquals(newHashSet(), newHashSet(local.listAll()));
+
+
+        try {
+            local.fileLength("nonExistentFile");
+            fail();
+        } catch (FileNotFoundException ignore) {
+
+        }
+
+        try {
+            local.openInput("nonExistentFile", IOContext.DEFAULT);
+            fail();
+        } catch (FileNotFoundException ignore) {
+
+        }
+
+        local.close();
+        assertFalse(baseDir.fileExists("t2"));
+    }
+
+    /**
+     * Checks for the case where if the file exist local before writer starts
+     * then those files do not get deleted even if deleted by writer via
+     * indexing process from 'baseDir' as they might be in use by existing open
+     * indexes
+     */
+    @Test
+    public void cowExistingLocalFileNotDeleted() throws Exception{
+        Directory baseDir = new CloseSafeDir();
+        IndexDefinition defn = new IndexDefinition(root, builder.getNodeState());
+        IndexCopier copier = new RAMIndexCopier(baseDir, sameThreadExecutor(), getWorkDir());
+
+        Directory remote = new CloseSafeDir();
+        byte[] t1 = writeFile(remote, "t1");
+        byte[] t2 = writeFile(remote, "t2");
+        Directory local = copier.wrapForWrite(defn, remote);
+        assertEquals(newHashSet("t1", "t2"), newHashSet(local.listAll()));
+
+        byte[] t3 = writeFile(local, "t3");
+
+        //Now pull in the file t1 via CopyOnRead in baseDir
+        Directory localForRead = copier.wrapForRead("/foo", defn, remote);
+        readAndAssert(localForRead, "t1", t1);
+
+        //File which was copied from remote should not be deleted from baseDir
+        //upon delete from local
+        assertTrue(baseDir.fileExists("t1"));
+        local.deleteFile("t1");
+        assertFalse("t1 should be deleted from remote", remote.fileExists("t1"));
+        assertFalse("t1 should be deleted from 'local' view also", local.fileExists("t1"));
+        assertTrue("t1 should not be deleted from baseDir", baseDir.fileExists("t1"));
+
+        //File which was created only via local SHOULD get removed from
+        //baseDir only upon close
+        assertTrue(baseDir.fileExists("t3"));
+        local.deleteFile("t3");
+        assertFalse("t1 should be deleted from remote", local.fileExists("t3"));
+        assertTrue("t1 should NOT be deleted from remote", baseDir.fileExists("t3"));
+
+        local.close();
+        assertFalse("t3 should also be deleted from local", baseDir.fileExists("t3"));
+    }
+
+    @Test
+    public void cowReadDoneFromLocalIfFileExist() throws Exception{
+        final Set<String> readLocal = newHashSet();
+        Directory baseDir = new CloseSafeDir(){
+            @Override
+            public IndexInput openInput(String name, IOContext context) throws IOException {
+                readLocal.add(name);
+                return super.openInput(name, context);
+            }
+        };
+        IndexDefinition defn = new IndexDefinition(root, builder.getNodeState());
+        IndexCopier copier = new RAMIndexCopier(baseDir, sameThreadExecutor(), getWorkDir());
+
+        final Set<String> readRemotes = newHashSet();
+        Directory remote = new RAMDirectory() {
+            @Override
+            public IndexInput openInput(String name, IOContext context) throws IOException {
+                readRemotes.add(name);
+                return super.openInput(name, context);
+            }
+        };
+        byte[] t1 = writeFile(remote, "t1");
+        Directory local = copier.wrapForWrite(defn, remote);
+
+        //Read should be served from remote
+        readRemotes.clear();readLocal.clear();
+        readAndAssert(local, "t1", t1);
+        assertEquals(newHashSet("t1"), readRemotes);
+        assertEquals(newHashSet(), readLocal);
+
+        //Now pull in the file t1 via CopyOnRead in baseDir
+        Directory localForRead = copier.wrapForRead("/foo", defn, remote);
+        readAndAssert(localForRead, "t1", t1);
+
+        //Read should be served from local
+        readRemotes.clear();readLocal.clear();
+        readAndAssert(local, "t1", t1);
+        assertEquals(newHashSet(), readRemotes);
+        assertEquals(newHashSet("t1"), readLocal);
+
+        local.close();
+    }
+
+    @Test
+    public void cowCopyDoneOnClose() throws Exception{
+        final CollectingExecutor executor = new CollectingExecutor();
+        Directory baseDir = new CloseSafeDir();
+        IndexDefinition defn = new IndexDefinition(root, builder.getNodeState());
+        IndexCopier copier = new RAMIndexCopier(baseDir, executor, getWorkDir());
+
+        Directory remote = new CloseSafeDir();
+
+        final Directory local = copier.wrapForWrite(defn, remote);
+        byte[] t1 = writeFile(local, "t1");
+
+        assertTrue(local.fileExists("t1"));
+        assertFalse("t1 should NOT be copied to remote", remote.fileExists("t1"));
+
+        //Execute all job
+        executor.executeAll();
+
+        assertTrue("t1 should now be copied to remote", remote.fileExists("t1"));
+
+        byte[] t2 = writeFile(local, "t2");
+        assertFalse("t2 should NOT be copied to remote", remote.fileExists("t2"));
+
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        final CountDownLatch copyLatch = new CountDownLatch(1);
+        Future<?> copyTasks = executorService.submit(new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                copyLatch.await();
+                executor.executeAll();
+                return null;
+            }
+        });
+
+        final CountDownLatch closeLatch = new CountDownLatch(1);
+        Future<?> closeTasks = executorService.submit(new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                closeLatch.await();
+                local.close();
+                return null;
+            }
+        });
+
+        closeLatch.countDown();
+        assertFalse("t2 should NOT be copied to remote", remote.fileExists("t2"));
+
+        //Let copy to proceed
+        copyLatch.countDown();
+
+        //Now wait for close to finish
+        closeTasks.get();
+        assertTrue("t2 should now be copied to remote", remote.fileExists("t2"));
+
+        executorService.shutdown();
+    }
+
+    @Test
+    public void cowCopyDoneOnCloseExceptionHandling() throws Exception{
+        final CollectingExecutor executor = new CollectingExecutor();
+        Directory baseDir = new CloseSafeDir();
+        IndexDefinition defn = new IndexDefinition(root, builder.getNodeState());
+        IndexCopier copier = new RAMIndexCopier(baseDir, executor, getWorkDir());
+
+        Directory remote = new CloseSafeDir();
+
+        final Directory local = copier.wrapForWrite(defn, remote);
+        byte[] t1 = writeFile(local, "t1");
+
+        assertTrue(local.fileExists("t1"));
+        assertFalse("t1 should NOT be copied to remote", remote.fileExists("t1"));
+
+        //Execute all job
+        executor.executeAll();
+
+        assertTrue("t1 should now be copied to remote", remote.fileExists("t1"));
+
+        byte[] t2 = writeFile(local, "t2");
+        assertFalse("t2 should NOT be copied to remote", remote.fileExists("t2"));
+
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        final CountDownLatch copyLatch = new CountDownLatch(1);
+        Future<?> copyTasks = executorService.submit(new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                copyLatch.await();
+                executor.executeAll();
+                executor.enableImmediateExecution();
+                return null;
+            }
+        });
+
+        final CountDownLatch closeLatch = new CountDownLatch(1);
+        Future<?> closeTasks = executorService.submit(new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                closeLatch.await();
+                local.close();
+                return null;
+            }
+        });
+
+        closeLatch.countDown();
+        assertFalse("t2 should NOT be copied to remote", remote.fileExists("t2"));
+
+        //Let copy to proceed
+        copyLatch.countDown();
+
+        //Now wait for close to finish
+        closeTasks.get();
+        assertTrue("t2 should now be copied to remote", remote.fileExists("t2"));
+
+        executorService.shutdown();
+    }
+
+    //TODO Test for failure in copy
+
+    @Test
+    public void cowIndexPathNotDefined() throws Exception{
+
+    }
+
     private byte[] writeFile(Directory dir, String name) throws IOException {
         byte[] data = randomBytes(rnd.nextInt(maxFileSize) + 1);
         IndexOutput o = dir.createOutput(name, IOContext.DEFAULT);
@@ -470,13 +738,18 @@ public class IndexCopierTest {
     private class RAMIndexCopier extends IndexCopier {
         final Directory baseDir;
 
-        public RAMIndexCopier(Directory baseDir, Executor executor, File indexRootDir) {
+        public RAMIndexCopier(Directory baseDir, Executor executor, File indexRootDir) throws IOException {
             super(executor, indexRootDir);
             this.baseDir = baseDir;
         }
 
         @Override
-        protected Directory createLocalDir(String indexPath, IndexDefinition definition) throws IOException {
+        protected Directory createLocalDirForIndexReader(String indexPath, IndexDefinition definition) throws IOException {
+            return baseDir;
+        }
+
+        @Override
+        protected Directory createLocalDirForIndexWriter(IndexDefinition definition) throws IOException {
             return baseDir;
         }
     }
@@ -503,18 +776,32 @@ public class IndexCopierTest {
     }
 
     private static class CollectingExecutor implements Executor {
-        final List<Runnable> commands = newArrayList();
+        final BlockingQueue<Runnable> commands = new LinkedBlockingQueue<Runnable>();
+        private boolean immediateExecution = false;
 
         @Override
         public void execute(Runnable command) {
-            commands.add(command);
+            if (immediateExecution){
+                command.run();
+            } else {
+                commands.add(command);
+            }
         }
 
         void executeAll(){
-            for (Runnable c : commands) {
+            Runnable c;
+            while ((c = commands.poll()) != null){
                 c.run();
             }
         }
+
+        void enableImmediateExecution(){
+            immediateExecution = true;
+        }
+
+        void enableDelayedExecution(){
+            immediateExecution = false;
+        }
     }
 
 }
diff --git oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
index 0c9909c..a63b083 100644
--- oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
+++ oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
@@ -22,16 +22,20 @@ package org.apache.jackrabbit.oak.plugins.index.lucene;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
 import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
+import org.apache.lucene.util.InfoStream;
 import org.apache.sling.testing.mock.osgi.MockOsgi;
 import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class LuceneIndexProviderServiceTest {
@@ -50,10 +54,16 @@ public class LuceneIndexProviderServiceTest {
 
         assertNotNull(context.getService(QueryIndexProvider.class));
         assertNotNull(context.getService(Observer.class));
+        assertNotNull(context.getService(IndexEditorProvider.class));
 
-        assertNotNull("CopyOnRead should be enabled by default",context.getService(CopyOnReadStatsMBean.class));
+        LuceneIndexEditorProvider editorProvider =
+                (LuceneIndexEditorProvider) context.getService(IndexEditorProvider.class);
+        assertNull(editorProvider.getIndexCopier());
+
+        assertNotNull("CopyOnRead should be enabled by default", context.getService(CopyOnReadStatsMBean.class));
 
         assertTrue(context.getService(Observer.class) instanceof BackgroundObserver);
+        assertEquals(InfoStream.NO_OUTPUT, InfoStream.getDefault());
 
         MockOsgi.deactivate(service);
     }
@@ -69,6 +79,31 @@ public class LuceneIndexProviderServiceTest {
         MockOsgi.deactivate(service);
     }
 
+    @Test
+    public void enableCopyOnWrite() throws Exception{
+        Map<String,Object> config = getDefaultConfig();
+        config.put("enableCopyOnWriteSupport", true);
+        MockOsgi.activate(service, context.bundleContext(), config);
+
+        LuceneIndexEditorProvider editorProvider =
+                (LuceneIndexEditorProvider) context.getService(IndexEditorProvider.class);
+
+        assertNotNull(editorProvider);
+        assertNotNull(editorProvider.getIndexCopier());
+
+        MockOsgi.deactivate(service);
+    }
+
+    @Test
+    public void debugLogging() throws Exception{
+        Map<String,Object> config = getDefaultConfig();
+        config.put("debug", true);
+        MockOsgi.activate(service, context.bundleContext(), config);
+
+        assertEquals(LoggingInfoStream.INSTANCE, InfoStream.getDefault());
+        MockOsgi.deactivate(service);
+    }
+
     private Map<String,Object> getDefaultConfig(){
         Map<String,Object> config = new HashMap<String, Object>();
         config.put("localIndexDir", folder.getRoot().getAbsolutePath());
diff --git oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexTest.java oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexTest.java
index 13cbd39..ce2774a 100644
--- oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexTest.java
+++ oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexTest.java
@@ -341,7 +341,7 @@ public class LuceneIndexTest {
     }
 
     private void purgeDeletedDocs(NodeBuilder idx, IndexDefinition definition) throws IOException {
-        IndexWriter writer = new IndexWriter(newIndexDirectory(definition, idx), getIndexWriterConfig(definition));
+        IndexWriter writer = new IndexWriter(newIndexDirectory(definition, idx), getIndexWriterConfig(definition, true));
         writer.forceMergeDeletes();
         writer.close();
     }
diff --git oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
index 75945a9..7db163f 100644
--- oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
+++ oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LucenePropertyIndexTest.java
@@ -19,12 +19,15 @@
 
 package org.apache.jackrabbit.oak.plugins.index.lucene;
 
+import java.io.IOException;
 import java.text.ParseException;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.jcr.PropertyType;
 
@@ -53,7 +56,10 @@ import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
 import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
 import org.apache.jackrabbit.util.ISO8601;
+import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import static com.google.common.collect.ImmutableSet.of;
 import static java.util.Arrays.asList;
@@ -88,6 +94,11 @@ public class LucenePropertyIndexTest extends AbstractQueryTest {
      */
     static final int NUMBER_OF_NODES = LucenePropertyIndex.LUCENE_QUERY_BATCH_SIZE * 2;
 
+    private ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
     @Override
     protected void createTestIndexNode() throws Exception {
         setTraversalEnabled(false);
@@ -101,12 +112,25 @@ public class LucenePropertyIndexTest extends AbstractQueryTest {
                 .with(new OpenSecurityProvider())
                 .with((QueryIndexProvider) provider)
                 .with((Observer) provider)
-                .with(new LuceneIndexEditorProvider())
+                .with(new LuceneIndexEditorProvider(createIndexCopier()))
                 .with(new PropertyIndexEditorProvider())
                 .with(new NodeTypeIndexProvider())
                 .createContentRepository();
     }
 
+    private IndexCopier createIndexCopier() {
+        try {
+            return new IndexCopier(executorService, temporaryFolder.getRoot());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @After
+    public void shutdownExecutor(){
+        executorService.shutdown();
+    }
+
     @Test
     public void fulltextSearchWithCustomAnalyzer() throws Exception {
         Tree idx = createFulltextIndex(root.getTree("/"), "test");
