Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java (revision 1755600) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation/CommitRateLimiter.java (working copy) @@ -21,6 +21,8 @@ import static org.apache.jackrabbit.oak.api.CommitFailedException.OAK; +import java.util.concurrent.atomic.AtomicInteger; + import javax.annotation.Nonnull; import org.apache.jackrabbit.oak.api.CommitFailedException; @@ -40,6 +42,15 @@ private volatile boolean blockCommits; private volatile long delay; + // the observation call depth of the current thread + // (only updated by the current thread, so technically isn't necessary that + // this is an AtomicInteger, but it's simpler to use it) + private static ThreadLocal NON_BLOCKING_LEVEL = + new ThreadLocal(); + + private static boolean EXCEPTION_ON_BLOCK = + Boolean.getBoolean("oak.commitRateLimiter.exceptionOnBlock"); + /** * Block any further commits until {@link #unblockCommits()} is called. */ @@ -54,10 +65,14 @@ blockCommits = false; } + public boolean getBlockCommits() { + return blockCommits; + } + /** - * Number of milli seconds to delay commits going through this hook. + * Number of milliseconds to delay commits going through this hook. * If {@code 0}, any currently blocked commit will be unblocked. - * @param delay milli seconds + * @param delay milliseconds */ public void setDelay(long delay) { if (LOG.isTraceEnabled()) { @@ -78,15 +93,32 @@ @Override public NodeState processCommit(NodeState before, NodeState after, CommitInfo info) throws CommitFailedException { - if (blockCommits) { - throw new CommitFailedException(OAK, 1, "System busy. Try again later."); + if (blockCommits && isThreadBlocking()) { + blockCommit(); + } else { + delay(); } - delay(); return after; } + + public void blockCommit() throws CommitFailedException { + if (EXCEPTION_ON_BLOCK) { + throw new CommitFailedException(OAK, 1, "System busy. Try again later."); + } + synchronized (this) { + try { + while (getBlockCommits()) { + wait(1000); + } + } catch (InterruptedException e) { + throw new CommitFailedException(OAK, 2, + "Interrupted while waiting to commit", e); + } + } + } - private void delay() throws CommitFailedException { - if (delay > 0) { + protected void delay() throws CommitFailedException { + if (delay > 0 && isThreadBlocking()) { synchronized (this) { try { long t0 = Clock.ACCURATE.getTime(); @@ -102,4 +134,44 @@ } } } + + /** + * The current thread will now run code that must not be throttled or + * blocked, such as processing events (EventListener.onEvent is going to be + * called). + */ + public void beforeNonBlocking() { + AtomicInteger value = NON_BLOCKING_LEVEL.get(); + if (value == null) { + value = new AtomicInteger(1); + NON_BLOCKING_LEVEL.set(value); + } else { + value.incrementAndGet(); + } + } + + /** + * The current thread finished running code that must not be throttled or + * blocked. + */ + public void afterNonBlocking() { + AtomicInteger value = NON_BLOCKING_LEVEL.get(); + if (value == null) { + // TODO should not happen (log an error?) + } else { + value.decrementAndGet(); + } + } + + /** + * Check whether the current thread is non-blocking. + * + * @return whether thread thread is non-blocking + */ + public boolean isThreadBlocking() { + AtomicInteger value = NON_BLOCKING_LEVEL.get(); + // no delay while processing events + return value == null || value.get() == 0; + } + } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (revision 1755600) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java (working copy) @@ -94,6 +94,7 @@ private static class ContentChange { private final NodeState root; private final CommitInfo info; + ContentChange(NodeState root, CommitInfo info) { this.root = root; this.info = info; @@ -129,6 +130,7 @@ ContentChange change = queue.poll(); if (change != null && change != STOP) { observer.contentChanged(change.root, change.info); + removed(queue.size()); currentTask.onComplete(completionHandler); } } catch (Throwable t) { @@ -182,11 +184,18 @@ /** * Called when ever an item has been added to the queue - * @param queueSize size of the queue + * @param newQueueSize size of the queue afterwards */ - protected void added(int queueSize) { } + protected void added(int newQueueSize) { } /** + * Called when ever an item has been removed from the queue. + * + * @param newQueueSize the size of the queue afterwards + */ + protected void removed(int newQueueSize) { } + + /** * @return The max queue length used for this observer's queue */ public int getMaxQueueLength() { @@ -209,6 +218,7 @@ queue.clear(); queue.add(STOP); stopped = true; + removed(queue.size()); } @Nonnull Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java =================================================================== --- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (revision 1755600) +++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (working copy) @@ -131,7 +131,7 @@ private final AtomicReference filterProvider; private final MeterStats eventCount; private final TimerStats eventDuration; - private final TimeSeriesMax maxQueueLength; + private final TimeSeriesMax maxQueueLengthRecorder; private final int queueLength; private final CommitRateLimiter commitRateLimiter; @@ -162,7 +162,7 @@ filterProvider = new AtomicReference(filter); this.eventCount = statisticManager.getMeter(OBSERVATION_EVENT_COUNTER); this.eventDuration = statisticManager.getTimer(OBSERVATION_EVENT_DURATION); - this.maxQueueLength = statisticManager.maxQueLengthRecorder(); + this.maxQueueLengthRecorder = statisticManager.maxQueLengthRecorder(); this.queueLength = queueLength; this.commitRateLimiter = commitRateLimiter; } @@ -226,11 +226,19 @@ private volatile boolean blocking; @Override - protected void added(int queueSize) { - maxQueueLength.recordValue(queueSize); - tracker.recordQueueLength(queueSize); - - if (queueSize == queueLength) { + protected void added(int newQueueSize) { + queueSizeChanged(newQueueSize); + } + + @Override + protected void removed(int newQueueSize) { + queueSizeChanged(newQueueSize); + } + + private void queueSizeChanged(int newQueueSize) { + maxQueueLengthRecorder.recordValue(newQueueSize); + tracker.recordQueueLength(newQueueSize); + if (newQueueSize >= queueLength) { if (commitRateLimiter != null) { if (!blocking) { LOG.warn("Revision queue is full. Further commits will be blocked."); @@ -241,7 +249,7 @@ } blocking = true; } else { - double fillRatio = (double) queueSize / queueLength; + double fillRatio = (double) newQueueSize / queueLength; if (fillRatio > DELAY_THRESHOLD) { if (commitRateLimiter != null) { if (delay == 0) { @@ -338,11 +346,17 @@ provider.getSubTrees(), Filters.all(filter, VISIBLE_FILTER)); if (events.hasNext() && runningMonitor.enterIf(running)) { + if (commitRateLimiter != null) { + commitRateLimiter.beforeNonBlocking(); + } try { CountingIterator countingEvents = new CountingIterator(events); eventListener.onEvent(countingEvents); countingEvents.updateCounters(eventCount, eventDuration); } finally { + if (commitRateLimiter != null) { + commitRateLimiter.afterNonBlocking(); + } runningMonitor.leave(); } } Index: oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/SlowObservationIT.java =================================================================== --- oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/SlowObservationIT.java (revision 0) +++ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/SlowObservationIT.java (working copy) @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.jcr.observation; + +import static javax.jcr.observation.Event.NODE_ADDED; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jcr.Node; +import javax.jcr.RepositoryException; +import javax.jcr.Session; +import javax.jcr.observation.Event; +import javax.jcr.observation.EventIterator; +import javax.jcr.observation.EventListener; +import javax.jcr.observation.ObservationManager; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; +import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest; +import org.apache.jackrabbit.oak.jcr.Jcr; +import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; +import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter; +import org.h2.util.Profiler; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An slow test case that that tries to test the commit rate limiter. + */ +@RunWith(Parameterized.class) +// Don't run "Parallelized" as this causes tests to timeout in "weak" environments +public class SlowObservationIT extends AbstractRepositoryTest { + + static final Logger LOG = LoggerFactory.getLogger(SlowObservationIT.class); + + private static final int OBSERVER_COUNT = 3; + + private static final boolean NO_DELAY_JUST_BLOCK = false; + + private static final boolean PROFILE = Boolean.getBoolean("oak.profile"); + + private static final String TEST_NODE = "test_node"; + private static final String TEST_PATH = '/' + TEST_NODE; + private static final String TEST2_NODE = "test_node2"; + private static final String TEST2_PATH = '/' + TEST2_NODE; + + public SlowObservationIT(NodeStoreFixture fixture) { + super(fixture); + } + + @Override + protected Jcr initJcr(Jcr jcr) { + CommitRateLimiter limiter = new CommitRateLimiter() { + + long lastLog; + + @Override + public void setDelay(long delay) { + long now = System.currentTimeMillis(); + if (now > lastLog + 1000) { + log("Delay " + delay); + lastLog = now; + } + super.setDelay(delay); + } + + @Override + protected void delay() throws CommitFailedException { + if (!NO_DELAY_JUST_BLOCK) { + // default behavior + super.delay(); + return; + } + if (getBlockCommits() && isThreadBlocking()) { + synchronized (this) { + try { + while (getBlockCommits()) { + wait(1000); + } + } catch (InterruptedException e) { + throw new CommitFailedException( + CommitFailedException.OAK, 2, "Interrupted while waiting to commit", e); + } + } + } + } + + }; + return super.initJcr(jcr).with(limiter); + } + + @Before + public void setup() throws RepositoryException { + if (!isDocumentNodeStore()) { + return; + } + Session session = getAdminSession(); + + Node nodetypeIndex = session.getRootNode().getNode("oak:index").getNode("nodetype"); + nodetypeIndex.remove(); + + Node testNode; + testNode = session.getRootNode().addNode(TEST_NODE, "oak:Unstructured"); + testNode.setProperty("test", 0); + testNode = session.getRootNode().addNode(TEST2_NODE, "oak:Unstructured"); + testNode.setProperty("test", 0); + session.save(); + } + + private boolean isDocumentNodeStore() { + // SegmentNodeStore can result in deadlocks, because + // the CommitRateLimiter may be blocking inside a sychronized block + return fixture.toString().indexOf("DocumentNodeStore") >= 0; + } + + @Test + public void observation() throws Exception { + if (!isDocumentNodeStore()) { + return; + } + AtomicBoolean saveInObservation = new AtomicBoolean(); + saveInObservation.set(true); + ArrayList listeners = new ArrayList(); + for (int i = 0; i < OBSERVER_COUNT; i++) { + Session observingSession = createAdminSession(); + MyListener listener = new MyListener(i, observingSession, saveInObservation); + listener.open(); + listeners.add(listener); + } + log("Starting..."); + Profiler prof = null; + long start = System.currentTimeMillis(); + for (int i = 1;; i++) { + if (prof == null && PROFILE) { + // prof = new Profiler().startCollecting(); + } + long time = System.currentTimeMillis() - start; + if (time > 20 * 1000) { + if (saveInObservation.get()) { + log("Disable saves in observation now"); + saveInObservation.set(false); + } + } + if (time > 30 * 1000) { + break; + } + Node testNode; + if (i % 100 < 52) { + // in 52% of the cases, use testNode + testNode = getNode(TEST_PATH); + } else { + // in 48% of the cases, use testNode2 + testNode = getNode(TEST2_PATH); + } + String a = "c-" + (i / 40); + String b = "c-" + (i % 40); + Node x; + if (testNode.hasNode(a)) { + x = testNode.getNode(a); + } else { + x = testNode.addNode(a, "oak:Unstructured"); + } + Node t = x.addNode(b, "oak:Unstructured"); + for (int j = 0; j < 10; j++) { + t.addNode("c-" + j, "oak:Unstructured"); + } + long saveTime = System.currentTimeMillis(); + getAdminSession().save(); + saveTime = System.currentTimeMillis() - saveTime; + if (saveTime > 100 || i % 200 == 0) { + if (prof != null) { + log(prof.getTop(1)); + prof = null; + } + log("Save #" + i + " took " + saveTime + " ms"); + } + } + log("Stopping..."); + for (MyListener listener : listeners) { + listener.stop(); + listener.waitUntilDone(); + listener.close(); + } + log("Done"); + if (PROFILE) { + printFullThreadDump(); + } + } + + //------------------------------------------------------------< private >--- + + private Node getNode(String path) throws RepositoryException { + return getAdminSession().getNode(path); + } + + /** + * A simple listener that writes in 10% of the cases. + */ + private static class MyListener implements EventListener { + + private final AtomicBoolean saveInObservation; + private final int id; + private final Session session; + private ObservationManager observationManager; + private volatile boolean stopped; + private volatile boolean done; + private Exception exception; + + MyListener(int id, Session session, AtomicBoolean saveInObservation) { + this.id = id; + this.session = session; + this.saveInObservation = saveInObservation; + } + + public void open() throws RepositoryException { + observationManager = session.getWorkspace().getObservationManager(); + observationManager.addEventListener(this, NODE_ADDED, "/" + TEST_NODE, true, null, null, false); + } + + public void close() throws RepositoryException { + observationManager.removeEventListener(this); + } + + public void stop() { + stopped = true; + } + + public void waitUntilDone() throws Exception { + while (!done) { + synchronized (this) { + wait(1000); + } + } + if (exception != null) { + throw exception; + } + } + + @Override + public void onEvent(EventIterator events) { + while (!stopped && events.hasNext()) { + Event event = events.nextEvent(); + try { + String path = event.getPath(); + // Thread.currentThread().setName("Observer path " + path); + // System.out.println("observer " + path); + if (Math.abs(path.hashCode() % OBSERVER_COUNT) != id) { + // if it's not for "my" observer, ignore + continue; + } + if (Math.random() > 0.5) { + // do nothing 50% of the time + continue; + } + // else save something as well: 5 times setProperty + if (saveInObservation.get()) { + if (session.getRootNode().hasNode(path.substring(1))) { + Node n = session.getNode(path); + // System.out.println("observer save "+ path); + for (int i = 0; i < 5; i++) { + n.setProperty("x", i); + session.save(); + } + } + } + } catch (Exception e) { + log("Error " + e); + LOG.error("Observation listener error", e); + exception = e; + } + } + done = true; + synchronized (this) { + notifyAll(); + } + } + } + + static void log(String message) { + if (PROFILE) { + System.out.println(message); + } + LOG.info(message); + } + + public static void printFullThreadDump() { + log(new Timestamp(System.currentTimeMillis()).toString() + .substring(0, 19)); + log("Full thread dump " + + System.getProperty("java.vm.name") + + " (" + System.getProperty("java.vm.version") + "):"); + log(""); + for (Entry e : Thread.getAllStackTraces() + .entrySet()) { + Thread t = e.getKey(); + log(String.format("\"%s\"%s prio=%d tid=0x%x", + t.getName(), + t.isDaemon() ? " daemon" : "", + t.getPriority(), + t.getId())); + log(" java.lang.Thread.State: " + t.getState()); + for (StackTraceElement s : e.getValue()) { + log("\tat " + s); + } + log(""); + } + } + +}