From dc9ca862155a5abf78b168821eb3a815c4625c07 Mon Sep 17 00:00:00 2001 From: Vikas Saurabh Date: Wed, 15 Feb 2017 09:37:50 +0530 Subject: [PATCH] OAK-5626: ChangeProcessor doesn't reset 'blocking' flag when items from queue gets removed and commit-rate-limiter is null ChangeProcessor would now log WARN each time queue gets full. To avoid flooding of logs, consecutive WARNs would be avoided a breathing period (Default 10 minutes. Can be configured by JVM command line param: "oak.observation.full-queue.warn.interval"). During the breathing period, queue full logs would still be logged at DEBUG level. Also, add test to check the behavior. --- .../oak/jcr/observation/ChangeProcessor.java | 27 ++++++- .../observation/ObservationQueueFullWarnTest.java | 83 +++++++++++++++++++++- 2 files changed, 106 insertions(+), 4 deletions(-) diff --git a/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java b/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java index 092fa04..27a0be4 100644 --- a/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java +++ b/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java @@ -113,7 +113,15 @@ class ChangeProcessor implements FilteringAwareObserver { * kicks in. */ public static final int MAX_DELAY; - + + /** + * Number of milliseconds to wait before issuing consecutive queue full warn messages + * Controlled by command line property "oak.observation.full-queue.warn.interval". + * Note, the command line parameter is wait interval in minutes. + */ + static long QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer + .getInteger("oak.observation.full-queue.warn.interval", 10)); + // OAK-4533: make DELAY_THRESHOLD and MAX_DELAY adjustable - using System.properties for now static { final String delayThresholdStr = System.getProperty("oak.commitRateLimiter.delayThreshold"); @@ -294,6 +302,8 @@ class ChangeProcessor implements FilteringAwareObserver { private volatile long delay; private volatile boolean blocking; + private long lastQueueFullWarnTimestamp = -1; + @Override protected void added(int newQueueSize) { queueSizeChanged(newQueueSize); @@ -310,11 +320,11 @@ class ChangeProcessor implements FilteringAwareObserver { if (newQueueSize >= queueLength) { if (commitRateLimiter != null) { if (!blocking) { - LOG.warn("Revision queue is full. Further commits will be blocked."); + logQueueFullWarning(); } commitRateLimiter.blockCommits(); } else if (!blocking) { - LOG.warn("Revision queue is full. Further revisions will be compacted."); + logQueueFullWarning(); } blocking = true; } else { @@ -346,11 +356,22 @@ class ChangeProcessor implements FilteringAwareObserver { commitRateLimiter.unblockCommits(); blocking = false; } + } else { + blocking = false; } } } } + private void logQueueFullWarning() { + long currTime = System.currentTimeMillis(); + if (lastQueueFullWarnTimestamp + QUEUE_FULL_WARN_INTERVAL < currTime) { + LOG.warn("Revision queue is full. Further revisions will be compacted."); + lastQueueFullWarnTimestamp = currTime; + } else { + LOG.debug("Revision queue is full. Further revisions will be compacted."); + } + } @Override public String toString() { diff --git a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java index 7aeb2b4..7c2297f 100644 --- a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java +++ b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.java @@ -42,6 +42,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import static javax.jcr.observation.Event.NODE_ADDED; import static org.junit.Assert.assertTrue; @@ -54,6 +55,8 @@ public class ObservationQueueFullWarnTest extends AbstractRepositoryTest { private static final String TEST_NODE_TYPE = "oak:Unstructured"; private static final String TEST_PATH = '/' + TEST_NODE; + private static final long CONDITION_TIMEOUT = 10*1000; + private Session observingSession; private ObservationManager observationManager; @@ -105,6 +108,65 @@ public class ObservationQueueFullWarnTest extends AbstractRepositoryTest { } } + @Test + public void warnOnRepeatedQueueFull() throws RepositoryException, InterruptedException, ExecutionException { + LogCustomizer warnLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName()) + .filter(Level.WARN) + .create(); + LogCustomizer debugLogs = LogCustomizer.forLogger(ChangeProcessor.class.getName()) + .enable(Level.DEBUG) + .filter(Level.DEBUG) + .create(); + + long oldWarnLogInterval = ChangeProcessor.QUEUE_FULL_WARN_INTERVAL; + + final LoggingListener listener = new LoggingListener(); + observationManager.addEventListener(listener, NODE_ADDED, TEST_PATH, true, null, null, false); + try { + Node n = getAdminSession().getNode(TEST_PATH); + int nodeNameCounter = 0; + + nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener); + + //Let queue empty up a bit. + listener.waitFor(CONDITION_TIMEOUT, new Condition() { + @Override + public boolean evaluate() { + return listener.numAdded >= 2; + } + }); + + //Assumption is that 10 minutes won't get passed by the time we'll attempt the second queue fill. + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(10); + + warnLogs.starting(); + debugLogs.starting(); + nodeNameCounter = addNodeToFillObsQueue(n, nodeNameCounter, listener); + assertTrue("Observation queue full warning must not logged until some time has past since last log", warnLogs.getLogs().size() == 0); + assertTrue("Observation queue full warning should get logged on debug though in the mean time", debugLogs.getLogs().size() > 0); + warnLogs.finished(); + debugLogs.finished(); + + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = TimeUnit.SECONDS.toMillis(1); + Thread.sleep(1000); + + warnLogs.starting(); + debugLogs.starting(); + addNodeToFillObsQueue(n, nodeNameCounter, listener); + assertTrue("Observation queue full warning must get logged after some time has past since last log", warnLogs.getLogs().size() > 0); + warnLogs.finished(); + debugLogs.finished(); + } + finally { + observationManager.removeEventListener(listener); + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = oldWarnLogInterval; + } + } + + private interface Condition { + boolean evaluate(); + } + private static int addNodeToFillObsQueue(Node parent, int nodeNameCounter, LoggingListener listener) throws RepositoryException { listener.blockObservation.acquireUninterruptibly(); @@ -121,15 +183,34 @@ public class ObservationQueueFullWarnTest extends AbstractRepositoryTest { private class LoggingListener implements EventListener { + private volatile int numAdded = 0; + Semaphore blockObservation = new Semaphore(1); @Override - public void onEvent(EventIterator events) { + public synchronized void onEvent(EventIterator events) { blockObservation.acquireUninterruptibly(); while (events.hasNext()) { events.nextEvent(); + numAdded++; } blockObservation.release(); + + notifyAll(); + } + + synchronized boolean waitFor(long timeout, Condition c) + throws InterruptedException { + long end = System.currentTimeMillis() + timeout; + long remaining = end - System.currentTimeMillis(); + while (remaining > 0) { + if (c.evaluate()) { + return true; + } + wait(remaining); + remaining = end - System.currentTimeMillis(); + } + return false; } } } -- 2.8.3