Index: oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java (revision 1710559) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java (working copy) @@ -134,6 +134,13 @@ CompositeData getExecutionTime(); /** + * Returns the number of indexed nodes as a {@link org.apache.jackrabbit.api.stats.TimeSeries}. + * + * @return the indexed nodes time series + */ + CompositeData getIndexedNodesCount(); + + /** * Returns the consolidated execution stats since last reset * @return consolidated execution stats */ @@ -162,4 +169,34 @@ */ void registerAsyncIndexer(@Name("name") String name, @Name("delayInSeconds") long delayInSeconds); + + /** + * @return true if the indexing job is failing + */ + boolean isFailing(); + + /** + * @return The time the indexing job stared failing, or {@code ""} if the + * job is not currently failing. + */ + String getFailingSince(); + + /** + * @return the number of consecutive failed executions or {@code 0} if the + * job is not currently failing. + */ + long getConsecutiveFailedExecutions(); + + /** + * @return the latest indexing error seen, will not be reset once the job + * starts working again + */ + String getLatestError(); + + /** + * @return the time when the latest indexing error has been seen, will not + * be reset once the job starts working again + */ + String getLatestErrorTime(); + } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java (revision 1710559) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java (working copy) @@ -15,7 +15,7 @@ * limitations under the License. */ -@Version("2.0.0") +@Version("3.0.0") @Export(optional = "provide:=true") package org.apache.jackrabbit.oak.api.jmx; Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (revision 1710559) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (working copy) @@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.plugins.index; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Throwables.getStackTraceAsString; import static com.google.common.collect.Sets.newHashSet; import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_DONE; import static org.apache.jackrabbit.oak.commons.PathUtils.elements; @@ -42,11 +43,6 @@ import javax.management.openmbean.OpenType; import javax.management.openmbean.SimpleType; -import com.google.common.base.Objects; -import com.google.common.base.Splitter; -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableMap; - import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; @@ -73,6 +69,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Objects; +import com.google.common.base.Splitter; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; + public class AsyncIndexUpdate implements Runnable { private static final Logger log = LoggerFactory @@ -121,9 +122,6 @@ private final long lifetime = DEFAULT_LIFETIME; // TODO: make configurable - /** Flag to avoid repeatedly logging failure warnings */ - private boolean failing = false; - private final AsyncIndexStats indexStats = new AsyncIndexStats(); /** Flag to switch to synchronous updates once the index caught up to the repo */ @@ -148,6 +146,14 @@ private IndexMBeanRegistration mbeanRegistration; + /** + * Controls the length of the interval (in minutes) at which an indexing + * error is logged as 'warning'. for the rest of the indexing cycles errors + * will be logged at 'debug' level + */ + private static long ERROR_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer + .getInteger("oak.async.warn.interval", 30)); + public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store, @Nonnull IndexEditorProvider provider, boolean switchOnSync) { this.name = checkNotNull(name); @@ -177,8 +183,6 @@ /** Expiration time of the last lease we committed */ private long lease; - private long updates = 0; - private final String leaseName; public AsyncUpdateCallback(String checkpoint, String afterCheckpoint) @@ -201,7 +205,7 @@ mergeWithConcurrencyCheck(builder, checkpoint, beforeLease); // reset updates counter - indexStats.setUpdates(this.updates); + indexStats.resetUpdates(); } private void updateTempCheckpoints(NodeBuilder async, @@ -229,7 +233,7 @@ } boolean isDirty() { - return updates > 0; + return indexStats.getUpdates() > 0; } void close() throws CommitFailedException { @@ -241,9 +245,7 @@ @Override public void indexUpdate() throws CommitFailedException { - updates++; - if (updates % 100 == 0) { - indexStats.setUpdates(this.updates); + if (indexStats.incUpdates() % 100 == 0) { long now = System.currentTimeMillis(); if (now + ASYNC_TIMEOUT > lease) { long newLease = now + 2 * ASYNC_TIMEOUT; @@ -334,9 +336,8 @@ after, afterCheckpoint, afterTime); // the update succeeded, i.e. it no longer fails - if (failing) { - log.info("[{}] Index update no longer fails", name); - failing = false; + if (indexStats.isFailing()) { + indexStats.fixed(); } // the update succeeded, so we can release the earlier checkpoint @@ -347,15 +348,8 @@ indexStats.setProcessedCheckpoint(""); indexStats.releaseTempCheckpoint(afterCheckpoint); - } catch (CommitFailedException e) { - if (e == CONCURRENT_UPDATE) { - log.debug("[{}] Concurrent update detected in the index update", name); - } else if (failing) { - log.debug("[{}] The index update is still failing", name, e); - } else { - log.warn("[{}] The index update failed", name, e); - failing = true; - } + } catch (Exception e) { + indexStats.failed(e); } finally { if (threadNameChanged) { @@ -451,9 +445,9 @@ String msg = "[{}] AsyncIndex update run completed in {}. Indexed {} nodes"; //Log at info level if time taken is more than 5 min if (watch.elapsed(TimeUnit.MINUTES) >= 5) { - log.info(msg, name, watch, callback.updates); + log.info(msg, name, watch, indexStats.getUpdates()); } else { - log.debug(msg, name, watch, callback.updates); + log.debug(msg, name, watch, indexStats.getUpdates()); } } @@ -533,6 +527,15 @@ private final Stopwatch watch = Stopwatch.createUnstarted(); private final ExecutionStats execStats = new ExecutionStats(); + /** Flag to avoid repeatedly logging failure warnings */ + private volatile boolean failing = false; + private long latestErrorWarn = 0; + + private String failingSince = ""; + private String latestError = null; + private String latestErrorTime = ""; + private long consecutiveFailures = 0; + public void start(String now) { status = STATUS_RUNNING; start = now; @@ -555,6 +558,43 @@ watch.reset(); } + public void failed(Exception e) { + latestError = getStackTraceAsString(e); + latestErrorTime = now(); + consecutiveFailures++; + if (!failing) { + // first occurrence of a failure + failing = true; + // reusing value so value display is consistent + failingSince = latestErrorTime; + latestErrorWarn = System.currentTimeMillis(); + log.warn("[{}] The index update failed", name, e); + } else { + // subsequent occurrences + boolean warn = System.currentTimeMillis() - latestErrorWarn > ERROR_WARN_INTERVAL; + if (warn) { + latestErrorWarn = System.currentTimeMillis(); + log.warn("[{}] The index update is still failing", name, e); + } else { + log.debug("[{}] The index update is still failing", name, e); + } + } + } + + public void fixed() { + log.info("[{}] Index update no longer fails", name); + failing = false; + failingSince = ""; + consecutiveFailures = 0; + latestErrorWarn = 0; + latestError = null; + latestErrorTime = ""; + } + + public boolean isFailing() { + return failing; + } + @Override public String getStart() { return start; @@ -593,8 +633,13 @@ return this.isPaused; } - void setUpdates(long updates) { - this.updates = updates; + void resetUpdates() { + this.updates = 0; + } + + long incUpdates() { + updates++; + return updates; } @Override @@ -644,6 +689,11 @@ } @Override + public CompositeData getIndexedNodesCount() { + return execStats.getIndexedNodesCount(); + } + + @Override public CompositeData getConsolidatedExecutionStats() { return execStats.getConsolidatedStats(); } @@ -657,9 +707,12 @@ public String toString() { return "AsyncIndexStats [start=" + start + ", done=" + done + ", status=" + status + ", paused=" + isPaused + + ", failing=" + failing + ", failingSince=" + failingSince + + ", consecutiveFailures=" + consecutiveFailures + ", updates=" + updates + ", referenceCheckpoint=" + referenceCp + ", processedCheckpoint=" + processedCp - + " ,tempCheckpoints=" + tempCps + " ]"; + + " ,tempCheckpoints=" + tempCps + ", latestErrorTime=" + + latestErrorTime + ", latestError=" + latestError + " ]"; } @Override @@ -670,6 +723,7 @@ private class ExecutionStats { private final TimeSeriesRecorder execCounter; private final TimeSeriesRecorder execTimer; + private final TimeSeriesRecorder indexedNodesCounter; /** * Captures consolidated execution stats since last reset @@ -683,6 +737,7 @@ private ExecutionStats() { execCounter = new TimeSeriesRecorder(true); execTimer = new TimeSeriesRecorder(true); + indexedNodesCounter = new TimeSeriesRecorder(true); try { consolidatedType = new CompositeType("ConsolidatedStats", @@ -701,6 +756,7 @@ private void recordExecution(long time, long updates) { execTimer.getCounter().addAndGet(time); + indexedNodesCounter.getCounter().addAndGet(updates); consolidatedExecTime.addAndGet(time); consolidatedNodes.addAndGet(updates); } @@ -713,6 +769,10 @@ return TimeSeriesStatsUtil.asCompositeData(execTimer, "ExecutionTime"); } + private CompositeData getIndexedNodesCount() { + return TimeSeriesStatsUtil.asCompositeData(indexedNodesCounter, "ExecutionNodesCount"); + } + private CompositeData getConsolidatedStats() { try { Long[] values = new Long[]{consolidatedExecRuns.longValue(), @@ -733,6 +793,7 @@ private void recordTick() { execCounter.recordOneSecond(); execTimer.recordOneSecond(); + indexedNodesCounter.recordOneSecond(); } } @@ -751,6 +812,26 @@ public void registerAsyncIndexer(String name, long delayInSeconds) { taskSplitter.registerAsyncIndexer(name, delayInSeconds); } + + @Override + public String getFailingSince() { + return failingSince; + } + + @Override + public long getConsecutiveFailedExecutions() { + return consecutiveFailures; + } + + @Override + public String getLatestError() { + return latestError; + } + + @Override + public String getLatestErrorTime() { + return latestErrorTime; + } } /** @@ -808,7 +889,7 @@ } public boolean isFailing() { - return failing; + return indexStats.isFailing(); } class IndexTaskSpliter { Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (revision 1710559) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (working copy) @@ -25,6 +25,7 @@ import static org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider.TYPE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -34,17 +35,18 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.management.openmbean.CompositeData; -import ch.qos.logback.classic.Level; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; import org.apache.jackrabbit.oak.commons.junit.LogCustomizer; +import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncIndexStats; import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.IndexTaskSpliter; import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider; import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup; @@ -61,6 +63,8 @@ import org.apache.jackrabbit.oak.spi.state.NodeStore; import org.junit.Test; +import ch.qos.logback.classic.Level; + import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -559,6 +563,25 @@ provider.isFailed()); assertTrue("Expecting no checkpoints", store.listCheckpoints().size() == 0); + + // OAK-3054 failure reports + AsyncIndexStats stats = async.getIndexStats(); + String since = stats.getFailingSince(); + assertTrue(stats.isFailing()); + assertEquals(1, stats.getConsecutiveFailedExecutions()); + assertEquals(since, stats.getLatestErrorTime()); + + TimeUnit.MILLISECONDS.sleep(100); + async.run(); + assertTrue(stats.isFailing()); + assertEquals(2, stats.getConsecutiveFailedExecutions()); + assertEquals(since, stats.getFailingSince()); + assertNotEquals(since, stats.getLatestErrorTime()); + + stats.fixed(); + assertFalse(stats.isFailing()); + assertEquals(0, stats.getConsecutiveFailedExecutions()); + assertEquals("", stats.getFailingSince()); } /**