Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java (revision 1754409) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java (working copy) @@ -21,6 +21,8 @@ import static org.apache.jackrabbit.oak.api.CommitFailedException.OAK; +import java.util.concurrent.atomic.AtomicInteger; + import javax.annotation.Nonnull; import org.apache.jackrabbit.oak.api.CommitFailedException; @@ -40,6 +42,15 @@ private volatile boolean blockCommits; private volatile long delay; + // the observation call depth of the current thread + // (only updated by the current thread, so technically isn't necessary that + // this is an AtomicInteger, but it's simpler to use it) + private static ThreadLocal NON_BLOCKING_LEVEL = + new ThreadLocal(); + + private static boolean EXCEPTION_ON_BLOCK = + Boolean.getBoolean("oak.commitRateLimiter.exceptionOnBlock"); + /** * Block any further commits until {@link #unblockCommits()} is called. */ @@ -54,10 +65,14 @@ blockCommits = false; } + public boolean getBlockCommits() { + return blockCommits; + } + /** - * Number of milli seconds to delay commits going through this hook. + * Number of milliseconds to delay commits going through this hook. * If {@code 0}, any currently blocked commit will be unblocked. - * @param delay milli seconds + * @param delay milliseconds */ public void setDelay(long delay) { if (LOG.isTraceEnabled()) { @@ -78,15 +93,32 @@ @Override public NodeState processCommit(NodeState before, NodeState after, CommitInfo info) throws CommitFailedException { - if (blockCommits) { - throw new CommitFailedException(OAK, 1, "System busy. Try again later."); + if (blockCommits && isThreadBlocking()) { + blockCommit(); + } else { + delay(); } - delay(); return after; } + + public void blockCommit() throws CommitFailedException { + if (EXCEPTION_ON_BLOCK) { + throw new CommitFailedException(OAK, 1, "System busy. Try again later."); + } + synchronized (this) { + try { + while (getBlockCommits()) { + wait(1000); + } + } catch (InterruptedException e) { + throw new CommitFailedException(OAK, 2, + "Interrupted while waiting to commit", e); + } + } + } - private void delay() throws CommitFailedException { - if (delay > 0) { + protected void delay() throws CommitFailedException { + if (delay > 0 && isThreadBlocking()) { synchronized (this) { try { long t0 = Clock.ACCURATE.getTime(); @@ -102,4 +134,44 @@ } } } + + /** + * The current thread will now run code that must not be throttled or + * blocked, such as processing events (EventListener.onEvent is going to be + * called). + */ + public void beforeNonBlocking() { + AtomicInteger value = NON_BLOCKING_LEVEL.get(); + if (value == null) { + value = new AtomicInteger(1); + NON_BLOCKING_LEVEL.set(value); + } else { + value.incrementAndGet(); + } + } + + /** + * The current thread finished running code that must not be throttled or + * blocked. + */ + public void afterNonBlocking() { + AtomicInteger value = NON_BLOCKING_LEVEL.get(); + if (value == null) { + // TODO should not happen (log an error?) + } else { + value.decrementAndGet(); + } + } + + /** + * Check whether the current thread is non-blocking. + * + * @return whether thread thread is non-blocking + */ + public boolean isThreadBlocking() { + AtomicInteger value = NON_BLOCKING_LEVEL.get(); + // no delay while processing events + return value == null || value.get() == 0; + } + } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (revision 1754409) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (working copy) @@ -94,6 +94,7 @@ private static class ContentChange { private final NodeState root; private final CommitInfo info; + private final long created = System.currentTimeMillis(); ContentChange(NodeState root, CommitInfo info) { this.root = root; this.info = info; @@ -129,6 +130,7 @@ ContentChange change = queue.poll(); if (change != null && change != STOP) { observer.contentChanged(change.root, change.info); + removed(queue.size(), change.created); currentTask.onComplete(completionHandler); } } catch (Throwable t) { @@ -187,6 +189,15 @@ protected void added(int queueSize) { } /** + * Called when ever an item has been removed from the queue. + * + * @param queueSize the size of the queue after the item was removed. + * @param created the time in milliseconds when the removed item was put + * into the queue. + */ + protected void removed(int queueSize, long created) { } + + /** * @return The max queue length used for this observer's queue */ public int getMaxQueueLength() { Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java =================================================================== --- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (revision 1754409) +++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (working copy) @@ -229,8 +229,19 @@ protected void added(int queueSize) { maxQueueLength.recordValue(queueSize); tracker.recordQueueLength(queueSize); - - if (queueSize == queueLength) { + queueSizeChanged(queueSize); + } + + // requires Jackrabbit version 2.13.2-SNAPSHOT or newer: + // @Override + // protected void removed(int queueSize, long created) { + // maxQueueLength.recordValue(queueSize); + // tracker.recordQueueLength(queueSize, created); + // queueSizeChanged(queueSize); + // } + + private void queueSizeChanged(int queueSize) { + if (queueSize >= queueLength) { if (commitRateLimiter != null) { if (!blocking) { LOG.warn("Revision queue is full. Further commits will be blocked."); @@ -338,11 +349,17 @@ provider.getSubTrees(), Filters.all(filter, VISIBLE_FILTER)); if (events.hasNext() && runningMonitor.enterIf(running)) { + if (commitRateLimiter != null) { + commitRateLimiter.beforeNonBlocking(); + } try { CountingIterator countingEvents = new CountingIterator(events); eventListener.onEvent(countingEvents); countingEvents.updateCounters(eventCount, eventDuration); } finally { + if (commitRateLimiter != null) { + commitRateLimiter.afterNonBlocking(); + } runningMonitor.leave(); } }