diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
index 96ed584..0a2ca7e 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
@@ -18,6 +18,7 @@
 package org.apache.jackrabbit.oak.api.jmx;
 
 import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
 
 import aQute.bnd.annotation.ProviderType;
 import org.apache.jackrabbit.oak.commons.jmx.Description;
@@ -209,4 +210,6 @@ void registerAsyncIndexer(@Name("name") String name,
      */
     String getLatestErrorTime();
 
+    TabularData getFailingIndexStats();
+
 }
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
index 7952cb3..5d95f22 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
@@ -46,6 +46,7 @@
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.OpenType;
 import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
 
 import com.google.common.collect.Lists;
 import org.apache.jackrabbit.api.stats.TimeSeries;
@@ -53,6 +54,7 @@
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
+import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
 import org.apache.jackrabbit.oak.core.ResetCommitAttributeHook;
 import org.apache.jackrabbit.oak.core.SimpleCommitContext;
@@ -60,6 +62,7 @@
 import org.apache.jackrabbit.oak.plugins.commit.ConflictHook;
 import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdate.MissingIndexProviderStrategy;
+import org.apache.jackrabbit.oak.plugins.index.TrackingCorruptIndexHandler.CorruptIndexInfo;
 import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
 import org.apache.jackrabbit.oak.spi.commit.CommitContext;
 import org.apache.jackrabbit.oak.spi.commit.CommitHook;
@@ -196,6 +199,8 @@
 
     private List<ValidatorProvider> validatorProviders = Collections.emptyList();
 
+    private TrackingCorruptIndexHandler corruptIndexHandler = new TrackingCorruptIndexHandler();
+
     public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
                             @Nonnull IndexEditorProvider provider, boolean switchOnSync) {
         this(name, store, provider, StatisticsProvider.NOOP, switchOnSync);
@@ -569,6 +574,28 @@ private void runWhenPermitted() {
         }
     }
 
+    private void markFailingIndexesAsCorrupt(NodeBuilder builder) {
+        for (Map.Entry<String, CorruptIndexInfo> index : corruptIndexHandler.getCorruptIndexData(name).entrySet()){
+            NodeBuilder indexBuilder = childBuilder(builder, index.getKey());
+            if (!indexBuilder.hasProperty(IndexConstants.CORRUPT_PROPERTY_NAME)){
+                CorruptIndexInfo info = index.getValue();
+                String corruptSince = ISO8601.format(info.getCorruptSinceAsCal());
+                indexBuilder.setProperty(
+                        PropertyStates.createProperty(IndexConstants.CORRUPT_PROPERTY_NAME, corruptSince, Type.DATE));
+                log.info("Marking [{}] as corrupt. The index is failing {}", info.getPath(), info.getStats());
+            } else {
+                log.debug("Failing index at [{}] is already marked as corrupt. The index is failing {}");
+            }
+        }
+    }
+
+    private static NodeBuilder childBuilder(NodeBuilder nb, String path) {
+        for (String name : PathUtils.elements(checkNotNull(path))) {
+            nb = nb.child(name);
+        }
+        return nb;
+    }
+
     private void maybeCleanUpCheckpoints() {
         // clean up every five minutes
         long currentMinutes = TimeUnit.MILLISECONDS.toMinutes(
@@ -649,8 +676,10 @@ protected boolean updateIndex(NodeState before, String beforeCheckpoint,
         try {
             NodeBuilder builder = store.getRoot().builder();
 
+            markFailingIndexesAsCorrupt(builder);
+
             IndexUpdate indexUpdate =
-                    new IndexUpdate(provider, name, after, builder, callback)
+                    new IndexUpdate(provider, name, after, builder, callback, CommitInfo.EMPTY, corruptIndexHandler)
                     .withMissingProviderStrategy(missingStrategy);
             CommitFailedException exception =
                     EditorDiff.process(VisibleEditor.wrap(indexUpdate), before, after);
@@ -695,6 +724,8 @@ protected boolean updateIndex(NodeState before, String beforeCheckpoint,
                         name, indexUpdate.getReindexStats(), watch);
                 progressLogged = true;
             }
+
+            corruptIndexHandler.markWorkingIndexes(indexUpdate.getUpdatedIndexPaths());
         } finally {
             callback.close();
         }
@@ -786,7 +817,15 @@ protected AsyncIndexUpdate setCloseTimeOut(int timeOutInSec) {
     }
 
     public void setValidatorProviders(List<ValidatorProvider> validatorProviders) {
-        this.validatorProviders = validatorProviders;
+        this.validatorProviders = checkNotNull(validatorProviders);
+    }
+
+    public void setCorruptIndexHandler(TrackingCorruptIndexHandler corruptIndexHandler) {
+        this.corruptIndexHandler = checkNotNull(corruptIndexHandler);
+    }
+
+    TrackingCorruptIndexHandler getCorruptIndexHandler() {
+        return corruptIndexHandler;
     }
 
     public boolean isClosed(){
@@ -907,7 +946,7 @@ public void fixed() {
         }
 
         public boolean isFailing() {
-            return failing;
+            return failing || corruptIndexHandler.isFailing(name);
         }
 
         @Override
@@ -1163,6 +1202,11 @@ public String getLatestError() {
         public String getLatestErrorTime() {
             return latestErrorTime;
         }
+
+        @Override
+        public TabularData getFailingIndexStats() {
+            return corruptIndexHandler.getFailingIndexStats(name);
+        }
     }
 
     /**
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
index f6d9a2a..1f3829c 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
@@ -73,6 +73,15 @@
     )
     private static final String PROP_LEASE_TIME_OUT = "leaseTimeOutMinutes";
 
+    private static final long PROP_FAILING_INDEX_TIMEOUT_DEFAULT = 30 * 60;
+    @Property(
+            longValue = PROP_FAILING_INDEX_TIMEOUT_DEFAULT,
+            label = "Failing Index Timeout (s)",
+            description = "Time interval in seconds after which a failing index is considered as corrupted and " +
+                    "ignored from further indexing untill reindex. To disable this set it to 0"
+    )
+    private static final String PROP_FAILING_INDEX_TIMEOUT = "failingIndexTimeoutSeconds";
+
     private static final char CONFIG_SEP = ':';
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final WhiteboardIndexEditorProvider indexEditorProvider = new WhiteboardIndexEditorProvider();
@@ -103,11 +112,15 @@ public void activate(BundleContext bundleContext, Map<String, Object> config) {
             log.info("Detected non clusterable setup. Lease checking would be disabled for async indexing");
         }
 
+        TrackingCorruptIndexHandler corruptIndexHandler = createCorruptIndexHandler(config);
+
         for (AsyncConfig c : asyncIndexerConfig) {
             AsyncIndexUpdate task = new AsyncIndexUpdate(c.name, nodeStore, indexEditorProvider,
                     statisticsProvider, false);
+            task.setCorruptIndexHandler(corruptIndexHandler);
             task.setValidatorProviders(Collections.singletonList(validatorProvider));
             task.setLeaseTimeOut(TimeUnit.MINUTES.toMillis(leaseTimeOutMin));
+
             indexRegistration.registerAsyncIndexer(task, c.timeIntervalInSecs);
         }
         log.info("Configured async indexers {} ", asyncIndexerConfig);
@@ -123,6 +136,23 @@ public void deactivate() {
 
     //~-------------------------------------------< internal >
 
+    private TrackingCorruptIndexHandler createCorruptIndexHandler(Map<String, Object> config) {
+        long failingIndexTimeoutSeconds = PropertiesUtil.toLong(config.get(PROP_FAILING_INDEX_TIMEOUT),
+                PROP_FAILING_INDEX_TIMEOUT_DEFAULT);
+
+        TrackingCorruptIndexHandler corruptIndexHandler = new TrackingCorruptIndexHandler();
+        corruptIndexHandler.setCorruptInterval(failingIndexTimeoutSeconds, TimeUnit.SECONDS);
+
+        if (failingIndexTimeoutSeconds <= 0){
+            log.info("[{}] is set to {}. Auto corrupt index isolation handling is disabled,",
+                    PROP_FAILING_INDEX_TIMEOUT, failingIndexTimeoutSeconds);
+        } else {
+            log.info("Auto corrupt index isolation handling is enabled. Any async index which fails for [{}]s would " +
+                    "be marked as corrupted and would be skipped from further indexing");
+        }
+        return corruptIndexHandler;
+    }
+
     static List<AsyncConfig> getAsyncConfig(String[] configs) {
         List<AsyncConfig> result = Lists.newArrayList();
         for (String config : configs) {
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/CorruptIndexHandler.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/CorruptIndexHandler.java
new file mode 100644
index 0000000..c902061
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/CorruptIndexHandler.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index;
+
+import java.util.Calendar;
+
+public interface CorruptIndexHandler {
+    CorruptIndexHandler NOOP = new CorruptIndexHandler() {
+        @Override
+        public void skippingCorruptIndex(String async, String indexPath, Calendar corruptSince) {
+
+        }
+
+        @Override
+        public void indexUpdateFailed(String async, String indexPath, Exception e) {
+
+        }
+    };
+
+    void skippingCorruptIndex(String async, String indexPath, Calendar corruptSince);
+
+    void indexUpdateFailed(String async, String indexPath, Exception e);
+}
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java
index 2ed50b4..7408eb0 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexConstants.java
@@ -77,4 +77,11 @@
      * IndexEditors
      */
     String INDEX_PATH = ":indexPath";
+
+    /**
+     * Property name for indicating that given index is corrupt and should be excluded
+     * from further indexing. Its value is the date when this index was marked as
+     * corrupt
+     */
+    String CORRUPT_PROPERTY_NAME = "corrupt";
 }
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
index 697cceb..4e27620 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdate.java
@@ -63,6 +63,7 @@
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
+import org.apache.jackrabbit.util.ISO8601;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,10 +135,17 @@ public IndexUpdate(
             IndexEditorProvider provider, String async,
             NodeState root, NodeBuilder builder,
             IndexUpdateCallback updateCallback, CommitInfo commitInfo) {
+        this(provider, async, root, builder, updateCallback, commitInfo, CorruptIndexHandler.NOOP);
+    }
+
+    public IndexUpdate(
+            IndexEditorProvider provider, String async,
+            NodeState root, NodeBuilder builder,
+            IndexUpdateCallback updateCallback, CommitInfo commitInfo, CorruptIndexHandler corruptIndexHandler) {
         this.parent = null;
         this.name = null;
         this.path = "/";
-        this.rootState = new IndexUpdateRootState(provider, async, root, updateCallback, commitInfo);
+        this.rootState = new IndexUpdateRootState(provider, async, root, updateCallback, commitInfo, corruptIndexHandler);
         this.builder = checkNotNull(builder);
     }
 
@@ -179,6 +187,10 @@ public boolean isReindexingPerformed(){
         return rootState.getReindexStats();
     }
 
+    public Set<String> getUpdatedIndexPaths(){
+        return rootState.getUpdatedIndexPaths();
+    }
+
     private boolean shouldReindex(NodeBuilder definition, NodeState before,
             String name) {
         PropertyState ps = definition.getProperty(REINDEX_PROPERTY_NAME);
@@ -205,10 +217,20 @@ private void collectIndexEditors(NodeBuilder definitions,
                     // probably not an index def
                     continue;
                 }
-                manageIndexPath(definition, name);
-                boolean shouldReindex = shouldReindex(definition,
-                        before, name);
+
+                boolean shouldReindex = shouldReindex(definition, before, name);
                 String indexPath = getIndexPath(getPath(), name);
+                if (definition.hasProperty(IndexConstants.CORRUPT_PROPERTY_NAME) && !shouldReindex){
+                    String corruptSince = definition.getProperty(IndexConstants.CORRUPT_PROPERTY_NAME).getValue(Type.DATE);
+                    log.warn("Ignoring corrupt index [{}] which has been marked as corrupt since [{}]. This index " +
+                                    "MUST be reindexed for indexing to work properly", indexPath,
+                            corruptSince);
+                    rootState.corruptIndexHandler.skippingCorruptIndex(rootState.async, indexPath, ISO8601.parse(corruptSince));
+                    continue;
+                }
+
+                manageIndexPath(definition, name);
+
                 Editor editor = rootState.provider.getIndexEditor(type, definition, rootState.root,
                         rootState.newCallback(indexPath, shouldReindex));
                 if (editor == null) {
@@ -229,6 +251,8 @@ private void collectIndexEditors(NodeBuilder definitions,
                                 definition.getChildNode(rm).remove();
                             }
                         }
+
+                        clearCorruptFlag(definition, indexPath);
                         reindex.put(concat(getPath(), INDEX_DEFINITIONS_NAME, name), editor);
                     }
                 } else {
@@ -364,6 +388,16 @@ public Editor childNodeDeleted(String name, NodeState before)
         return reindex.keySet();
     }
 
+    private void clearCorruptFlag(NodeBuilder definition, String indexPath) {
+        PropertyState corrupt = definition.getProperty(IndexConstants.CORRUPT_PROPERTY_NAME);
+        //Remove any corrupt property
+        if (corrupt != null) {
+            definition.removeProperty(IndexConstants.CORRUPT_PROPERTY_NAME);
+            log.info("Removing corrupt flag from index [{}] which has been marked " +
+                    "as corrupt since [{}]", indexPath, corrupt);
+        }
+    }
+
     private static String getIndexPath(String path, String indexName) {
         if (PathUtils.denotesRoot(path)) {
             return "/" + INDEX_DEFINITIONS_NAME + "/" + indexName;
@@ -439,14 +473,16 @@ public IndexUpdate withMissingProviderStrategy(
         final IndexUpdateCallback updateCallback;
         final Set<String> reindexedIndexes = Sets.newHashSet();
         final Map<String, CountingCallback> callbacks = Maps.newHashMap();
+        final CorruptIndexHandler corruptIndexHandler;
 
         private IndexUpdateRootState(IndexEditorProvider provider, String async, NodeState root,
-                                     IndexUpdateCallback updateCallback, CommitInfo commitInfo) {
+                                     IndexUpdateCallback updateCallback, CommitInfo commitInfo, CorruptIndexHandler corruptIndexHandler) {
             this.provider = checkNotNull(provider);
             this.async = async;
             this.root = checkNotNull(root);
             this.updateCallback = checkNotNull(updateCallback);
             this.commitInfo = commitInfo;
+            this.corruptIndexHandler = corruptIndexHandler;
         }
 
         public IndexUpdateCallback newCallback(String indexPath, boolean reindex) {
@@ -480,6 +516,14 @@ public String getReport() {
             return stats;
         }
 
+        public Set<String> getUpdatedIndexPaths(){
+            Set<String> indexPaths = Sets.newHashSet();
+            for (CountingCallback cb : callbacks.values()) {
+                indexPaths.add(cb.getIndexPath());
+            }
+            return indexPaths;
+        }
+
         public boolean somethingIndexed() {
             for (CountingCallback cb : callbacks.values()) {
                 if (cb.count > 0){
@@ -548,6 +592,11 @@ public boolean isReindexing() {
             public boolean isAsync() {
                 return async != null;
             }
+
+            @Override
+            public void indexUpdateFailed(Exception e) {
+                corruptIndexHandler.indexUpdateFailed(async, indexPath, e);
+            }
         }
     }
 
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java
index b03ee18..87640e7 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateProvider.java
@@ -43,6 +43,8 @@ public void indexUpdate() {
 
     private final MissingIndexProviderStrategy missingStrategy;
 
+    private CorruptIndexHandler corruptIndexHandler = CorruptIndexHandler.NOOP;
+
     public IndexUpdateProvider(IndexEditorProvider provider, boolean failOnMissingIndexProvider) {
         this(provider, null, failOnMissingIndexProvider);
     }
@@ -64,9 +66,12 @@ public Editor getRootEditor(
             NodeState before, NodeState after,
             NodeBuilder builder, CommitInfo info) {
 
-        IndexUpdate editor = new IndexUpdate(provider, async, after, builder, NOOP_CALLBACK, info)
+        IndexUpdate editor = new IndexUpdate(provider, async, after, builder, NOOP_CALLBACK, info, corruptIndexHandler)
                 .withMissingProviderStrategy(missingStrategy);
         return VisibleEditor.wrap(editor);
     }
 
+    public void setCorruptIndexHandler(CorruptIndexHandler corruptIndexHandler) {
+        this.corruptIndexHandler = corruptIndexHandler;
+    }
 }
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexingContext.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexingContext.java
index 1d158a1..59db413 100644
--- a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexingContext.java
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexingContext.java
@@ -45,4 +45,10 @@
      * asynchronously
      */
     boolean isAsync();
+
+    /**
+     * Invoked by IndexEditor to indicate that update of index has failed
+     * @param e exception stack for failed updated
+     */
+    void indexUpdateFailed(Exception e);
 }
diff --git a/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/TrackingCorruptIndexHandler.java b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/TrackingCorruptIndexHandler.java
new file mode 100644
index 0000000..9df0494
--- /dev/null
+++ b/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/TrackingCorruptIndexHandler.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index;
+
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class TrackingCorruptIndexHandler implements CorruptIndexHandler {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private Clock clock = Clock.SIMPLE;
+    private int indexerCycleCount;
+    private long corruptIntervalMillis = TimeUnit.MINUTES.toMillis(30);
+    private final Map<String, CorruptIndexInfo> indexes = Maps.newConcurrentMap();
+
+    public Map<String, CorruptIndexInfo> getCorruptIndexData(String asyncName){
+        if (corruptIntervalMillis <= 0){
+            return Collections.emptyMap();
+        }
+
+        Map<String, CorruptIndexInfo> result = Maps.newHashMap();
+        for (CorruptIndexInfo info : indexes.values()){
+            if (asyncName.equals(info.asyncName) && info.isFailingSinceLongTime()){
+                result.put(info.path, info);
+            }
+        }
+        return result;
+    }
+
+    public Map<String, CorruptIndexInfo> getFailingIndexData(String asyncName){
+        Map<String, CorruptIndexInfo> result = Maps.newHashMap();
+        for (CorruptIndexInfo info : indexes.values()){
+            if (asyncName.equals(info.asyncName)){
+                result.put(info.path, info);
+            }
+        }
+        return result;
+    }
+
+    public void markWorkingIndexes(Set<String> updatedIndexPaths) {
+        indexerCycleCount++;
+        for (String indexPath : updatedIndexPaths){
+            CorruptIndexInfo info = indexes.remove(indexPath);
+            if (info != null){
+                log.info("Index at [{}] which was so far failing {} is now working again.", info.path, info.getStats());
+            }
+        }
+    }
+
+    public boolean isFailing(String asyncName) {
+        return !getFailingIndexData(asyncName).isEmpty();
+    }
+
+    //~--------------------------------< CorruptIndexHandler >
+
+    @Override
+    public void skippingCorruptIndex(String async, String indexPath, Calendar corruptSince) {
+        getOrCreateInfo(async, indexPath).skippedIndexing(checkNotNull(corruptSince));
+    }
+
+    @Override
+    public void indexUpdateFailed(String async, String indexPath, Exception e) {
+        getOrCreateInfo(async, indexPath).addFailure(e);
+    }
+
+    //~---------------------------------< Setters >
+
+    public void setCorruptInterval(long interval, TimeUnit unit) {
+        this.corruptIntervalMillis = unit.toMillis(interval);
+    }
+
+    void setClock(Clock clock) {
+        this.clock = clock;
+    }
+
+    long getCorruptIntervalMillis() {
+        return corruptIntervalMillis;
+    }
+
+    private long getTime(){
+        return clock.getTime();
+    }
+
+    private synchronized CorruptIndexInfo getOrCreateInfo(String asyncName, String indexPath) {
+        CorruptIndexInfo info = indexes.get(indexPath);
+        if (info == null){
+            info = new CorruptIndexInfo(asyncName, indexPath);
+            indexes.put(indexPath, info);
+        }
+        return info;
+    }
+
+    public class CorruptIndexInfo {
+        private final String asyncName;
+        private final String path;
+        private final int lastIndexerCycleCount = indexerCycleCount;
+        private String exception = "";
+        private int failureCount;
+        private int skippedCount;
+        private long corruptSince;
+
+        CorruptIndexInfo(String asyncName, String path) {
+            this.asyncName = asyncName;
+            this.path = path;
+            this.corruptSince = getTime();
+        }
+
+        void addFailure(Exception e){
+            exception = Throwables.getStackTraceAsString(e);
+            failureCount++;
+        }
+
+        void skippedIndexing(Calendar corruptSince){
+            skippedCount++;
+            this.corruptSince = corruptSince.getTimeInMillis();
+        }
+
+        public boolean isFailingSinceLongTime() {
+            return getTime() - corruptSince > corruptIntervalMillis;
+        }
+
+        public String getStats() {
+            return String.format("since %tc ,%d indexing cycles, failed %d times, skipped %d time",
+                    corruptSince, getCycleCount(), failureCount, skippedCount);
+        }
+
+        public Calendar getCorruptSinceAsCal() {
+            Calendar cal = Calendar.getInstance();
+            cal.setTimeInMillis(corruptSince);
+            return cal;
+        }
+
+        public String getLastException() {
+            return exception;
+        }
+
+        public boolean isMarkedAsCorrupt(){
+            return skippedCount > 0;
+        }
+
+        public int getSkippedCount() {
+            return skippedCount;
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        private int getCycleCount() {
+            return indexerCycleCount - lastIndexerCycleCount;
+        }
+    }
+
+    //~-----------------------------------------------------< MBean Support >
+
+    public TabularData getFailingIndexStats(String asyncName) {
+        TabularDataSupport tds;
+        try {
+            TabularType tt = new TabularType(TrackingCorruptIndexHandler.class.getName(),
+                    "Failing Index Stats", FailingIndexStats.TYPE, new String[]{"path"});
+            tds = new TabularDataSupport(tt);
+            Map<String, CorruptIndexInfo> infos = getFailingIndexData(asyncName);
+            for (Map.Entry<String, CorruptIndexInfo> e : infos.entrySet()) {
+                FailingIndexStats stats = new FailingIndexStats(e.getValue());
+                tds.put(stats.toCompositeData());
+            }
+        } catch (OpenDataException e) {
+            throw new IllegalStateException(e);
+        }
+        return tds;
+    }
+
+    private static class FailingIndexStats {
+        static final String[] FIELD_NAMES = new String[]{
+                "path",
+                "stats",
+                "markedCorrupt",
+                "failingSince",
+                "exception"
+        };
+
+        static final String[] FIELD_DESCRIPTIONS = new String[]{
+                "Path",
+                "Failure stats",
+                "Marked as corrupt",
+                "Failure start time",
+                "Exception"
+        };
+
+        @SuppressWarnings("rawtypes")
+        static final OpenType[] FIELD_TYPES = new OpenType[]{
+                SimpleType.STRING,
+                SimpleType.STRING,
+                SimpleType.BOOLEAN,
+                SimpleType.STRING,
+                SimpleType.STRING,
+        };
+
+        static final CompositeType TYPE = createCompositeType();
+
+        static CompositeType createCompositeType() {
+            try {
+                return new CompositeType(
+                        FailingIndexStats.class.getName(),
+                        "Composite data type for Failing Index statistics",
+                        FailingIndexStats.FIELD_NAMES,
+                        FailingIndexStats.FIELD_DESCRIPTIONS,
+                        FailingIndexStats.FIELD_TYPES);
+            } catch (OpenDataException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        private final CorruptIndexInfo info;
+
+        public FailingIndexStats(CorruptIndexInfo info){
+            this.info = info;
+        }
+
+        CompositeDataSupport toCompositeData() {
+            Object[] values = new Object[]{
+                    info.path,
+                    info.getStats(),
+                    info.isMarkedAsCorrupt(),
+                    String.format("%tc", info.getCorruptSinceAsCal().getTimeInMillis()),
+                    info.getLastException(),
+            };
+            try {
+                return new CompositeDataSupport(TYPE, FIELD_NAMES, values);
+            } catch (OpenDataException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+    }
+}
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
index 6ebab0c..86eb5a5 100644
--- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
@@ -66,6 +66,7 @@
 import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncIndexStats;
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.IndexTaskSpliter;
+import org.apache.jackrabbit.oak.plugins.index.TrackingCorruptIndexHandler.CorruptIndexInfo;
 import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
@@ -1604,6 +1605,127 @@ public void validatorProviderInvocation() throws Exception{
 
     }
 
+    @Test
+    public void longTimeFailingIndexMarkedAsCorrupt() throws Exception{
+        MemoryNodeStore store = new MemoryNodeStore();
+
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "fooIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "barIndex", true, false, ImmutableSet.of("bar"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("testRoot1").setProperty("foo", "abc");
+        builder.child("testRoot2").setProperty("bar", "abc");
+
+        // merge it back in
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        TestIndexEditorProvider provider = new TestIndexEditorProvider();
+
+        Clock clock = new Clock.Virtual();
+        AsyncIndexUpdate async = new AsyncIndexUpdate("async", store, provider);
+        async.getCorruptIndexHandler().setClock(clock);
+        async.run();
+
+        //1. Basic sanity check. Indexing works
+        PropertyIndexLookup lookup = new PropertyIndexLookup(store.getRoot());
+        assertEquals(ImmutableSet.of("testRoot1"), find(lookup, "foo", "abc"));
+        assertEquals(ImmutableSet.of("testRoot2"), find(lookup, "bar", "abc"));
+
+        //2. Add some new content
+        builder = store.getRoot().builder();
+        builder.child("testRoot3").setProperty("foo", "xyz");
+        builder.child("testRoot4").setProperty("bar", "xyz");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        //3. Now fail the indexing for 'bar'
+        provider.enableFailureMode("/oak:index/barIndex");
+
+        async.run();
+        assertTrue(async.getIndexStats().isFailing());
+        //barIndex is failing but not yet considered corrupted
+        assertTrue(async.getCorruptIndexHandler().getFailingIndexData("async").containsKey("/oak:index/barIndex"));
+        assertFalse(async.getCorruptIndexHandler().getCorruptIndexData("async").containsKey("/oak:index/barIndex"));
+
+        CorruptIndexInfo barIndexInfo = async.getCorruptIndexHandler().getFailingIndexData("async").get("/oak:index/barIndex");
+
+        //fooIndex is fine
+        assertFalse(async.getCorruptIndexHandler().getFailingIndexData("async").containsKey("/oak:index/fooIndex"));
+
+        //lookup should also fail as indexing failed
+        lookup = new PropertyIndexLookup(store.getRoot());
+        assertTrue(find(lookup, "foo", "xyz").isEmpty());
+        assertTrue(find(lookup, "bar", "xyz").isEmpty());
+
+        //4.Now move the clock forward and let the failing index marked as corrupt
+        clock.waitUntil(clock.getTime() + async.getCorruptIndexHandler().getCorruptIntervalMillis() + 1);
+
+        //5. Let async run again
+        async.run();
+
+        //Indexing should be ok now
+        assertFalse(async.getIndexStats().isFailing());
+
+        //barIndex should be considered corrupt now
+        assertTrue(async.getCorruptIndexHandler().getCorruptIndexData("async").containsKey("/oak:index/barIndex"));
+
+        lookup = new PropertyIndexLookup(store.getRoot());
+
+        //fooIndex should now report updated result. barIndex would fail
+        assertEquals(ImmutableSet.of("testRoot3"), find(lookup, "foo", "xyz"));
+        assertTrue(find(lookup, "bar", "xyz").isEmpty());
+        assertEquals(1, barIndexInfo.getSkippedCount());
+
+        //6. Index some stuff
+        builder = store.getRoot().builder();
+        builder.child("testRoot5").setProperty("foo", "pqr");
+        builder.child("testRoot6").setProperty("bar", "pqr");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        async.run();
+        assertFalse(async.getIndexStats().isFailing());
+
+        //barIndex should be skipped
+        assertEquals(2, barIndexInfo.getSkippedCount());
+
+        //7. Lets reindex barIndex and ensure index is not misbehaving
+        provider.disableFailureMode();
+
+        builder = store.getRoot().builder();
+        builder.child("oak:index").child("barIndex").setProperty("reindex", true);
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        async.run();
+
+        //now barIndex should not be part of failing index
+        assertFalse(async.getCorruptIndexHandler().getFailingIndexData("async").containsKey("/oak:index/barIndex"));
+    }
+
+    private static class TestIndexEditorProvider extends PropertyIndexEditorProvider {
+        private String indexPathToFail;
+        @Override
+        public Editor getIndexEditor(@Nonnull String type, @Nonnull NodeBuilder definition, @Nonnull NodeState root,
+                                     @Nonnull IndexUpdateCallback callback) {
+            IndexingContext context = ((ContextAwareCallback)callback).getIndexingContext();
+            if (indexPathToFail != null && indexPathToFail.equals(context.getIndexPath())){
+                RuntimeException e = new RuntimeException();
+                context.indexUpdateFailed(e);
+                throw e;
+            }
+            return super.getIndexEditor(type, definition, root, callback);
+        }
+
+        public void enableFailureMode(String indexPathToFail){
+            this.indexPathToFail = indexPathToFail;
+        }
+
+        public void disableFailureMode(){
+            indexPathToFail = null;
+        }
+    }
+
     private static class CollectingValidatorProvider extends ValidatorProvider {
         final Set<String> visitedPaths = Sets.newHashSet();
 
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
index f661632..b30a160 100644
--- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
@@ -142,6 +142,18 @@ public void configParsing() throws Exception{
         assertEquals(23, configs.get(1).timeIntervalInSecs);
     }
 
+    @Test
+    public void corruptIndexTimeout() throws Exception{
+        injectDefaultServices();
+        Map<String,Object> config = ImmutableMap.<String, Object>of(
+                "asyncConfigs", new String[] {"async:5"},
+                "failingIndexTimeoutSeconds" , "43"
+        );
+        MockOsgi.activate(service, context.bundleContext(), config);
+        AsyncIndexUpdate indexUpdate = getIndexUpdate("async");
+        assertEquals(TimeUnit.SECONDS.toMillis(43), indexUpdate.getCorruptIndexHandler().getCorruptIntervalMillis());
+    }
+
     private void injectDefaultServices() {
         context.registerService(StatisticsProvider.class, StatisticsProvider.NOOP);
         context.registerService(NodeStore.class, nodeStore);
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateTest.java
index bf25608..54541c8 100644
--- a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateTest.java
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/IndexUpdateTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.index;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Arrays.asList;
 import static org.apache.jackrabbit.JcrConstants.NT_BASE;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
@@ -37,13 +38,17 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.Calendar;
+import java.util.Map;
 import java.util.Set;
 
 import javax.annotation.Nonnull;
 
+import com.google.common.collect.Maps;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdate.MissingIndexProviderStrategy;
 import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup;
@@ -63,7 +68,9 @@
 import org.apache.jackrabbit.oak.spi.query.PropertyValues;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.util.ISO8601;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableSet;
@@ -512,6 +519,7 @@ public void contextAwareCallback_async() throws Exception{
     }
 
     private static class CallbackCapturingProvider extends PropertyIndexEditorProvider {
+        private Map<String, IndexingContext> callbacks = Maps.newHashMap();
         IndexUpdateCallback callback;
 
         @Override
@@ -520,9 +528,22 @@ public Editor getIndexEditor(@Nonnull String type, @Nonnull NodeBuilder definiti
             Editor editor = super.getIndexEditor(type, definition, root, callback);
             if (editor != null){
                 this.callback = callback;
+                if (callback instanceof ContextAwareCallback){
+                    IndexingContext context = ((ContextAwareCallback) callback).getIndexingContext();
+                    callbacks.put(context.getIndexPath(), context);
+                }
             }
             return editor;
         }
+
+        public void reset(){
+            callback = null;
+            callbacks.clear();
+        }
+
+        public IndexingContext getContext(String indexPath){
+            return callbacks.get(indexPath);
+        }
     }
 
 
@@ -602,4 +623,58 @@ public void testAsyncMVPDefinition() throws Exception {
         assertTrue(IndexUpdate.isIncluded("async-other", base));
     }
 
+    @Test
+    public void corruptIndexSkipped() throws Exception{
+        NodeState before = builder.getNodeState();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null);
+
+        NodeState after = builder.getNodeState();
+
+        CallbackCapturingProvider provider = new CallbackCapturingProvider();
+        EditorHook hook = new EditorHook(new IndexUpdateProvider(provider));
+
+        //1. Basic sanity - provider gets invoked
+        NodeState indexed = hook.processCommit(before, after, CommitInfo.EMPTY);
+        String indexPath = "/oak:index/rootIndex";
+        assertNotNull(provider.getContext(indexPath));
+
+
+        //2. Mark as corrupt and assert that editor is not invoked
+        builder = indexed.builder();
+        before = indexed;
+        builder.child("testRoot").setProperty("foo", "abc");
+        markCorrupt(builder, "rootIndex");
+        after = builder.getNodeState();
+
+        provider.reset();
+        indexed = hook.processCommit(before, after, CommitInfo.EMPTY);
+        assertNull(provider.getContext(indexPath));
+
+        //3. Now reindex and that should reset corrupt flag
+        builder = indexed.builder();
+        before = indexed;
+        child(builder, indexPath).setProperty(IndexConstants.REINDEX_PROPERTY_NAME, true);
+        after = builder.getNodeState();
+        provider.reset();
+        indexed = hook.processCommit(before, after, CommitInfo.EMPTY);
+
+        assertFalse(NodeStateUtils.getNode(indexed, indexPath).hasProperty(IndexConstants.CORRUPT_PROPERTY_NAME));
+        assertNotNull(provider.getContext(indexPath));
+    }
+
+
+
+    private static void markCorrupt(NodeBuilder builder, String indexName) {
+        builder.getChildNode(INDEX_DEFINITIONS_NAME).getChildNode(indexName)
+                .setProperty(IndexConstants.CORRUPT_PROPERTY_NAME, ISO8601.format(Calendar.getInstance()));
+    }
+
+    private static NodeBuilder child(NodeBuilder nb, String path){
+        for (String name : PathUtils.elements(checkNotNull(path))) {
+            nb = nb.child(name);
+        }
+        return nb;
+    }
+
 }
diff --git a/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/TrackingCorruptIndexHandlerTest.java b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/TrackingCorruptIndexHandlerTest.java
new file mode 100644
index 0000000..91d2a13
--- /dev/null
+++ b/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/TrackingCorruptIndexHandlerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TrackingCorruptIndexHandlerTest {
+
+    private TrackingCorruptIndexHandler handler = new TrackingCorruptIndexHandler();
+    private Clock clock = new Clock.Virtual();
+
+    @Test
+    public void basics() throws Exception{
+        handler.setClock(clock);
+        handler.indexUpdateFailed("async", "/oak:index/foo", new Exception());
+
+        //Index would not be considered corrupt until timeout
+        assertFalse(handler.getCorruptIndexData("async").containsKey("/oak:index/foo"));
+
+        clock.waitUntil(clock.getTime() + handler.getCorruptIntervalMillis() + 1);
+        assertTrue(handler.getCorruptIndexData("async").containsKey("/oak:index/foo"));
+
+        //Should only be visible for "async"
+        assertFalse(handler.getCorruptIndexData("async-fulltext").containsKey("/oak:index/foo"));
+
+        handler.markWorkingIndexes(Collections.singleton("/oak:index/foo"));
+        assertFalse(handler.getCorruptIndexData("async").containsKey("/oak:index/foo"));
+    }
+
+    @Test
+    public void disbaled() throws Exception{
+        handler.setClock(clock);
+        handler.indexUpdateFailed("async", "/oak:index/foo", new Exception());
+
+        clock.waitUntil(clock.getTime() + handler.getCorruptIntervalMillis() + 1);
+        assertTrue(handler.getCorruptIndexData("async").containsKey("/oak:index/foo"));
+
+        handler.setCorruptInterval(0, TimeUnit.SECONDS);
+
+        //With timeout set to zero no corrupt index should be reported
+        assertFalse(handler.getCorruptIndexData("async").containsKey("/oak:index/foo"));
+    }
+
+}
\ No newline at end of file
diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java
index 876fbf0..21d28dc 100644
--- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java
+++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditor.java
@@ -184,8 +184,10 @@ public void leave(NodeState before, NodeState after)
             try {
                 context.closeWriter();
             } catch (IOException e) {
-                throw new CommitFailedException("Lucene", 4,
-                        "Failed to close the Lucene index", e);
+                CommitFailedException ce = new CommitFailedException("Lucene", 4,
+                        "Failed to close the Lucene index " + context.getIndexingContext().getIndexPath(), e);
+                context.getIndexingContext().indexUpdateFailed(ce);
+                throw ce;
             }
             if (context.getIndexedNodes() > 0) {
                 log.debug("[{}] => Indexed {} nodes, done.", getIndexName(), context.getIndexedNodes());
@@ -249,9 +251,10 @@ public Editor childNodeDeleted(String name, NodeState before)
                 writer.deleteDocuments(path);
                 this.context.indexUpdate();
             } catch (IOException e) {
-                throw new CommitFailedException("Lucene", 5,
-                        "Failed to remove the index entries of"
-                                + " the removed subtree " + path, e);
+                CommitFailedException ce = new CommitFailedException("Lucene", 5, "Failed to remove the index entries of"
+                                + " the removed subtree " + path + "for index " + context.getIndexingContext().getIndexPath(), e);
+                context.getIndexingContext().indexUpdateFailed(ce);
+                throw ce;
             }
         }
 
@@ -279,8 +282,10 @@ private boolean addOrUpdate(String path, NodeState state, boolean isUpdate)
                 return true;
             }
         } catch (IOException e) {
-            throw new CommitFailedException("Lucene", 3,
+            CommitFailedException ce = new CommitFailedException("Lucene", 3,
                     "Failed to index the node " + path, e);
+            context.getIndexingContext().indexUpdateFailed(ce);
+            throw ce;
         } catch (IllegalArgumentException ie) {
             log.warn("Failed to index the node [{}]", path, ie);
         }
diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java
index 207e2a7..745889f 100644
--- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java
+++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorContext.java
@@ -29,6 +29,7 @@
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
+import org.apache.jackrabbit.oak.plugins.index.IndexingContext;
 import org.apache.jackrabbit.oak.plugins.index.lucene.util.FacetHelper;
 import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriter;
 import org.apache.jackrabbit.oak.plugins.index.lucene.writer.LuceneIndexWriterFactory;
@@ -85,6 +86,8 @@
 
     private final NodeState root;
 
+    private final IndexingContext indexingContext;
+
     private final boolean asyncIndexing;
     /**
      * The media types supported by the parser used.
@@ -101,9 +104,10 @@
                              LuceneIndexWriterFactory indexWriterFactory,
                              ExtractedTextCache extractedTextCache,
                              IndexAugmentorFactory augmentorFactory,
-                             boolean asyncIndexing) {
+                             IndexingContext indexingContext, boolean asyncIndexing) {
         configureUniqueId(definition);
         this.root = root;
+        this.indexingContext = checkNotNull(indexingContext);
         this.definitionBuilder = definition;
         this.indexWriterFactory = indexWriterFactory;
         this.definition = indexDefinition != null ? indexDefinition : new IndexDefinition(root, definition);
@@ -133,6 +137,10 @@ LuceneIndexWriter getWriter() throws IOException {
         return writer;
     }
 
+    public IndexingContext getIndexingContext() {
+        return indexingContext;
+    }
+
     /**
      * close writer if it's not null
      */
diff --git a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
index 7482157..d771778 100644
--- a/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
+++ b/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProvider.java
@@ -151,7 +151,7 @@ public Editor getIndexEditor(
             }
 
             LuceneIndexEditorContext context = new LuceneIndexEditorContext(root, definition, indexDefinition, callback,
-                    writerFactory, extractedTextCache, augmentorFactory, asyncIndexing);
+                    writerFactory, extractedTextCache, augmentorFactory, indexingContext, asyncIndexing);
             return new LuceneIndexEditor(context);
         }
         return null;
diff --git a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTrackerTest.java b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTrackerTest.java
index b85b919..016dd50 100644
--- a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTrackerTest.java
+++ b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexTrackerTest.java
@@ -23,9 +23,11 @@
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.ImmutableSet;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.index.IndexUpdateProvider;
+import org.apache.jackrabbit.oak.plugins.index.TrackingCorruptIndexHandler;
 import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
 import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
@@ -33,6 +35,7 @@
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.plugins.index.lucene.BadIndexTracker.BadIndexInfo;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
@@ -43,12 +46,12 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings("UnusedAssignment")
 public class IndexTrackerTest {
-    private static final EditorHook HOOK = new EditorHook(
-            new IndexUpdateProvider(
-                    new LuceneIndexEditorProvider()));
+    private TrackingCorruptIndexHandler corruptIndexHandler = new TrackingCorruptIndexHandler();
+    private EditorHook hook;
 
     private NodeState root = INITIAL_CONTENT;
 
@@ -56,16 +59,24 @@
 
     private IndexTracker tracker = new IndexTracker();
 
+    @Before
+    public void setUp(){
+        IndexUpdateProvider updateProvider = new IndexUpdateProvider(
+                new LuceneIndexEditorProvider(), "async", false);
+        updateProvider.setCorruptIndexHandler(corruptIndexHandler);
+        hook = new EditorHook(updateProvider);
+    }
+
     @Test
     public void update() throws Exception{
         NodeBuilder index = builder.child(INDEX_DEFINITIONS_NAME);
-        newLucenePropertyIndexDefinition(index, "lucene", ImmutableSet.of("foo"), null);
+        newLucenePropertyIndexDefinition(index, "lucene", ImmutableSet.of("foo"), "async");
 
         NodeState before = builder.getNodeState();
         builder.setProperty("foo", "bar");
         NodeState after = builder.getNodeState();
 
-        NodeState indexed = HOOK.processCommit(before, after, CommitInfo.EMPTY);
+        NodeState indexed = hook.processCommit(before, after, CommitInfo.EMPTY);
 
         assertEquals(0, tracker.getIndexNodePaths().size());
 
@@ -91,7 +102,7 @@ public void badIndexAccess() throws Exception{
         builder.setProperty("foo", "bar");
         NodeState after = builder.getNodeState();
 
-        NodeState indexed = HOOK.processCommit(before, after, CommitInfo.EMPTY);
+        NodeState indexed = hook.processCommit(before, after, CommitInfo.EMPTY);
         tracker.update(indexed);
 
         IndexNode indexNode = tracker.acquireIndexNode("/oak:index/foo");
@@ -152,7 +163,7 @@ public void badIndexAccess() throws Exception{
         before = indexed;
         after = reindex("/oak:index/foo");
 
-        indexed = HOOK.processCommit(before, after, CommitInfo.EMPTY);
+        indexed = hook.processCommit(before, after, CommitInfo.EMPTY);
         tracker.update(indexed);
 
         //7. Now indexNode should be accessible
@@ -164,6 +175,35 @@ public void badIndexAccess() throws Exception{
         assertNull(badIdxInfo);
     }
 
+    @Test
+    public void notifyFailedIndexing() throws Exception{
+        createIndex("foo");
+
+        //1. Create and populate index
+        NodeState before = builder.getNodeState();
+        builder.setProperty("foo", "bar");
+        NodeState after = builder.getNodeState();
+
+        NodeState indexed = hook.processCommit(before, after, CommitInfo.EMPTY);
+        tracker.update(indexed);
+
+        builder = indexed.builder();
+        indexed = corruptIndex("/oak:index/foo");
+
+        builder = indexed.builder();
+        builder.setProperty("foo", "bar2");
+        after = builder.getNodeState();
+
+        try {
+            hook.processCommit(before, after, CommitInfo.EMPTY);
+            fail("Indexing should have failed");
+        } catch (CommitFailedException ignore){
+
+        }
+
+        assertTrue(corruptIndexHandler.getFailingIndexData("async").containsKey("/oak:index/foo"));
+    }
+
     private NodeState corruptIndex(String indexPath) {
         NodeBuilder dir = TestUtil.child(builder, PathUtils.concat(indexPath, ":data"));
         for (String name : dir.getChildNodeNames()){
@@ -183,7 +223,7 @@ private NodeState reindex(String indexPath){
 
     private void createIndex(String propName){
         NodeBuilder index = builder.child(INDEX_DEFINITIONS_NAME);
-        newLucenePropertyIndexDefinition(index, propName, ImmutableSet.of(propName), null);
+        newLucenePropertyIndexDefinition(index, propName, ImmutableSet.of(propName), "async");
     }
 
 }
\ No newline at end of file
diff --git a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java
index 099d335..5306289 100644
--- a/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java
+++ b/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexEditorProviderTest.java
@@ -166,6 +166,11 @@ public boolean isAsync() {
         }
 
         @Override
+        public void indexUpdateFailed(Exception e) {
+
+        }
+
+        @Override
         public void indexUpdate() throws CommitFailedException {
 
         }
