diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
index 0674da9..bcbf9e4 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
@@ -642,6 +642,7 @@ public class Oak {
                 AsyncIndexUpdate task = new AsyncIndexUpdate(t.getKey(), store,
                         indexEditors);
                 indexRegistration.registerAsyncIndexer(task, t.getValue());
+                closer.register(task);
             }
 
             PropertyIndexAsyncReindex asyncPI = new PropertyIndexAsyncReindex(
diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
index e5baf9e..78c5c69 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
@@ -27,10 +27,13 @@ import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPE
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
 
+import java.io.Closeable;
 import java.util.Calendar;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -74,7 +77,7 @@ import com.google.common.base.Splitter;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 
-public class AsyncIndexUpdate implements Runnable {
+public class AsyncIndexUpdate implements Runnable, Closeable {
 
     private static final Logger log = LoggerFactory
             .getLogger(AsyncIndexUpdate.class);
@@ -90,6 +93,9 @@ public class AsyncIndexUpdate implements Runnable {
     private static final CommitFailedException CONCURRENT_UPDATE = new CommitFailedException(
             "Async", 1, "Concurrent update detected");
 
+    private static final CommitFailedException INTERRUPTED = new CommitFailedException(
+            "Async", 1, "Indexing stopped");
+
     /**
      * Timeout in milliseconds after which an async job would be considered as
      * timed out. Another node in cluster would wait for timeout before
@@ -139,6 +145,10 @@ public class AsyncIndexUpdate implements Runnable {
 
     private final IndexTaskSpliter taskSplitter = new IndexTaskSpliter();
 
+    private final Semaphore runPermit = new Semaphore(1);
+
+    private final AtomicBoolean stopFlag = new AtomicBoolean();
+
     private IndexMBeanRegistration mbeanRegistration;
 
     private long leaseTimeOut;
@@ -151,6 +161,8 @@ public class AsyncIndexUpdate implements Runnable {
     private static long ERROR_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer
             .getInteger("oak.async.warn.interval", 30));
 
+    private int softTimeOut = 2 * 60;
+
     public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
             @Nonnull IndexEditorProvider provider, boolean switchOnSync) {
         this.name = checkNotNull(name);
@@ -194,14 +206,17 @@ public class AsyncIndexUpdate implements Runnable {
 
         private final AsyncIndexStats indexStats;
 
+        private final AtomicBoolean stopFlag;
+
         /** Expiration time of the last lease we committed */
         private long lease;
 
         public AsyncUpdateCallback(NodeStore store, String name,
                 long leaseTimeOut, String checkpoint, String afterCheckpoint,
-                AsyncIndexStats indexStats) {
+                AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
             this.store = store;
             this.name = name;
+            this.stopFlag = stopFlag;
             this.leaseTimeOut = leaseTimeOut;
             this.checkpoint = checkpoint;
             this.afterCheckpoint = afterCheckpoint;
@@ -268,6 +283,10 @@ public class AsyncIndexUpdate implements Runnable {
 
         @Override
         public void indexUpdate() throws CommitFailedException {
+            if (stopFlag.get()){
+                throw INTERRUPTED;
+            }
+
             if (indexStats.incUpdates() % 100 == 0) {
                 long now = System.currentTimeMillis();
                 if (now + leaseTimeOut > lease) {
@@ -283,9 +302,54 @@ public class AsyncIndexUpdate implements Runnable {
 
     @Override
     public synchronized void run() {
+        boolean permitAcquired = false;
+        try{
+            if (runPermit.tryAcquire()){
+                permitAcquired = true;
+                runWhenPermitted();
+            } else {
+                log.warn("[{}] Could not acquire run permit. Stop flag set to [{}] Skipping the run", name, stopFlag);
+            }
+        } finally {
+            if (permitAcquired){
+                runPermit.release();
+            }
+        }
+    }
+
+
+    @Override
+    public void close() {
+        int hardTimeOut = 5 * softTimeOut;
+        if(!runPermit.tryAcquire()){
+            log.info("[{}] [WAITING] Indexing in progress. Would wait for {} secs for it to finish", name,
+                    softTimeOut);
+            try {
+                if(!runPermit.tryAcquire(softTimeOut, TimeUnit.SECONDS)){
+                    log.warn("[{}] [SOFT LIMIT HIT] Indexing found to be in progress for more than [{}]s. Would " +
+                            "signal it to now stop", name, softTimeOut);
+                    stopFlag.set(true);
+                    if(!runPermit.tryAcquire(hardTimeOut, TimeUnit.SECONDS)){
+                        log.warn("[{}] Indexing still not found to be complete. Giving up after [{}]s", name, hardTimeOut);
+                    } else {
+                        log.warn("[{}] [CLOSED EXCEPTION] Indexing run closed with exception", name);
+                    }
+                } else {
+                    log.info("[{}] [CLOSED OK]Async indexing run completed. Closing it now", name);
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        } else {
+            log.info("[{}] Closed", name);
+        }
+    }
+
+    private void runWhenPermitted() {
         if (indexStats.isPaused()) {
             return;
         }
+
         log.debug("[{}] Running background index task", name);
 
         NodeState root = store.getRoot();
@@ -396,10 +460,11 @@ public class AsyncIndexUpdate implements Runnable {
     }
 
     protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
-            String name, long leaseTimeOut, String beforeCheckpoint,
-            String afterCheckpoint, AsyncIndexStats indexStats) {
+                                                         String name, long leaseTimeOut, String beforeCheckpoint,
+                                                         String afterCheckpoint, AsyncIndexStats indexStats,
+                                                         AtomicBoolean stopFlag) {
         return new AsyncUpdateCallback(store, name, leaseTimeOut,
-                beforeCheckpoint, afterCheckpoint, indexStats);
+                beforeCheckpoint, afterCheckpoint, indexStats, stopFlag);
     }
 
     private boolean updateIndex(NodeState before, String beforeCheckpoint,
@@ -411,7 +476,7 @@ public class AsyncIndexUpdate implements Runnable {
         // create an update callback for tracking index updates
         // and maintaining the update lease
         AsyncUpdateCallback callback = newAsyncUpdateCallback(store, name,
-                leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats);
+                leaseTimeOut, beforeCheckpoint, afterCheckpoint, indexStats, stopFlag);
         callback.prepare();
 
         // check for index tasks split requests, if a split happened, make
@@ -535,6 +600,19 @@ public class AsyncIndexUpdate implements Runnable {
         return this;
     }
 
+    protected AsyncIndexUpdate setCloseTimeOut(int timeOutInSec) {
+        this.softTimeOut = timeOutInSec;
+        return this;
+    }
+
+    boolean isClosed(){
+        return stopFlag.get();
+    }
+
+    boolean isClosing(){
+        return runPermit.hasQueuedThreads();
+    }
+
     private static void preAsyncRunStatsStats(AsyncIndexStats stats) {
         stats.start(now());
     }
diff --git oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
index 80555de..b60c357 100644
--- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
+++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
@@ -23,17 +23,22 @@ import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFIN
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
 import static org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider.TYPE;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -1027,4 +1032,175 @@ public class AsyncIndexUpdateTest {
         customLogs.finished();
     }
 
+    @Test
+    public void noRunWhenClosed() throws Exception{
+        NodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+
+        AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
+        async.run();
+
+        async.close();
+        LogCustomizer lc = createLogCustomizer(Level.WARN);
+        async.run();
+        assertEquals(1, lc.getLogs().size());
+        assertThat(lc.getLogs().get(0), containsString("Could not acquire run permit"));
+
+        lc.finished();
+    }
+
+    @Test
+    public void closeWithSoftLimit() throws Exception{
+        NodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot").setProperty("foo", "abc");
+
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        final Semaphore asyncLock = new Semaphore(1);
+        final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
+            @Override
+            protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
+                                                                 String beforeCheckpoint, String afterCheckpoint,
+                                                                 AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
+                try {
+                    asyncLock.acquire();
+                } catch (InterruptedException ignore) {
+                }
+                return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+                        indexStats, stopFlag);
+            }
+        };
+
+        async.setCloseTimeOut(1000);
+
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                async.run();
+            }
+        });
+
+        Thread closer = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                async.close();
+            }
+        });
+
+        asyncLock.acquire();
+        t.start();
+
+        //Wait till async gets to wait state i.e. inside run
+        while(!asyncLock.hasQueuedThreads());
+
+        LogCustomizer lc = createLogCustomizer(Level.INFO);
+        closer.start();
+
+        //Wait till closer is in waiting state
+        while(!async.isClosing());
+
+        //For softLimit case the flag should not be set
+        assertFalse(async.isClosed());
+        assertLogPhrase(lc.getLogs(), "[WAITING]");
+
+        //Let indexing run complete now
+        asyncLock.release();
+
+        //Wait for both threads
+        t.join();
+        closer.join();
+
+        //Close call should complete
+        assertLogPhrase(lc.getLogs(), "[CLOSED OK]");
+    }
+
+    @Test
+    public void closeWithHardLimit() throws Exception{
+        NodeStore store = new MemoryNodeStore();
+        IndexEditorProvider provider = new PropertyIndexEditorProvider();
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot").setProperty("foo", "abc");
+
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        final Semaphore asyncLock = new Semaphore(1);
+        final AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider) {
+            @Override
+            protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store, String name, long leaseTimeOut,
+                                                                 String beforeCheckpoint, String afterCheckpoint,
+                                                                 AsyncIndexStats indexStats, AtomicBoolean stopFlag) {
+                try {
+                    asyncLock.acquire();
+                } catch (InterruptedException ignore) {
+                }
+                return super.newAsyncUpdateCallback(store, name, leaseTimeOut, beforeCheckpoint, afterCheckpoint,
+                        indexStats, stopFlag);
+            }
+        };
+
+        async.setCloseTimeOut(1);
+
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                async.run();
+            }
+        });
+
+        Thread closer = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                async.close();
+            }
+        });
+
+        asyncLock.acquire();
+        t.start();
+
+        //Wait till async gets to wait state i.e. inside run
+        while(!asyncLock.hasQueuedThreads());
+
+        LogCustomizer lc = createLogCustomizer(Level.INFO);
+        closer.start();
+
+        //Wait till stopFlag is set
+        while(!async.isClosed());
+
+        assertLogPhrase(lc.getLogs(), "[SOFT LIMIT HIT]");
+
+        //Let indexing run complete now
+        asyncLock.release();
+
+        //Wait for both threads
+        t.join();
+
+        //Async run would have failed with exception
+        assertNotNull(async.getIndexStats().getLatestError());
+
+        //Wait for close call to complete
+        closer.join();
+
+        //Close call should complete with exception mode
+        assertLogPhrase(lc.getLogs(), "[CLOSED EXCEPTION]");
+    }
+
+
+    private void assertLogPhrase(List<String> logs, String logPhrase){
+        assertThat(logs.toString(), containsString(logPhrase));
+    }
+
+    private static LogCustomizer createLogCustomizer(Level level){
+        LogCustomizer lc = LogCustomizer.forLogger(AsyncIndexUpdate.class.getName()).filter(level).create();
+        lc.starting();
+        return lc;
+    }
+
 }
diff --git oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
index 17f0217..9092116 100644
--- oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
+++ oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
@@ -399,10 +399,11 @@ public class AsyncIndexUpdateLeaseTest extends OakBaseTest {
 
         @Override
         protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
-                String name, long leaseTimeOut, String checkpoint,
-                String afterCheckpoint, AsyncIndexStats indexStats) {
+                                                             String name, long leaseTimeOut, String checkpoint,
+                                                             String afterCheckpoint, AsyncIndexStats indexStats,
+                                                             AtomicBoolean stopFlag) {
             return new SpecialAsyncUpdateCallback(store, name, leaseTimeOut,
-                    checkpoint, afterCheckpoint, indexStats, listener);
+                    checkpoint, afterCheckpoint, indexStats, stopFlag, listener);
         }
     }
 
@@ -411,10 +412,9 @@ public class AsyncIndexUpdateLeaseTest extends OakBaseTest {
         private IndexStatusListener listener;
 
         public SpecialAsyncUpdateCallback(NodeStore store, String name,
-                long leaseTimeOut, String checkpoint, String afterCheckpoint,
-                AsyncIndexStats indexStats, IndexStatusListener listener) {
-            super(store, name, leaseTimeOut, checkpoint, afterCheckpoint,
-                    indexStats);
+                                          long leaseTimeOut, String checkpoint, String afterCheckpoint,
+                                          AsyncIndexStats indexStats, AtomicBoolean stopFlag, IndexStatusListener listener) {
+            super(store, name, leaseTimeOut, checkpoint, afterCheckpoint, indexStats, stopFlag);
             this.listener = listener;
         }
 
