Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java (revision 1760486) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/util/MongoConnection.java (working copy) @@ -17,6 +17,7 @@ package org.apache.jackrabbit.oak.plugins.document.util; import java.net.UnknownHostException; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -34,6 +35,7 @@ */ public class MongoConnection { + private static final int DEFAULT_MAX_WAIT_TIME = (int) TimeUnit.MINUTES.toMillis(1); private static final WriteConcern WC_UNKNOWN = new WriteConcern("unknown"); private final MongoClientURI mongoURI; private final MongoClient mongo; @@ -98,6 +100,7 @@ public static MongoClientOptions.Builder getDefaultBuilder() { return new MongoClientOptions.Builder() .description("MongoConnection for Oak DocumentMK") + .maxWaitTime(DEFAULT_MAX_WAIT_TIME) .threadsAllowedToBlockForConnectionMultiplier(100); } Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java =================================================================== --- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (revision 1760486) +++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (working copy) @@ -226,6 +226,15 @@ private volatile boolean blocking; @Override + public void contentChanged(@Nonnull NodeState root, + @Nullable CommitInfo info) { + if (queueLength <= 1) { + info = null; + } + super.contentChanged(root, info); + } + + @Override protected void added(int queueSize) { maxQueueLength.recordValue(queueSize); tracker.recordQueueLength(queueSize); @@ -237,7 +246,10 @@ } commitRateLimiter.blockCommits(); } else if (!blocking) { - LOG.warn("Revision queue is full. Further revisions will be compacted."); + // do not log if queue length is set to one + if (queueLength > 1) { + LOG.warn("Revision queue is full. Further revisions will be compacted."); + } } blocking = true; } else { Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java =================================================================== --- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (revision 1760486) +++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (working copy) @@ -138,8 +138,14 @@ } } + private void addEventListener(EventListener listener, + ListenerTracker tracker, + FilterProvider filterProvider) { + addEventListener(listener, tracker, filterProvider, queueLength); + } + private synchronized void addEventListener(EventListener listener, ListenerTracker tracker, - FilterProvider filterProvider) { + FilterProvider filterProvider, int queueLength) { ChangeProcessor processor = processors.get(listener); if (processor == null) { @@ -150,7 +156,7 @@ // session. See OAK-1368. processor = new ChangeProcessor(sessionDelegate.getContentSession(), namePathMapper, tracker, filterProvider, statisticManager, queueLength, - commitRateLimiter); + queueLength > 1 ? commitRateLimiter : null); processors.put(listener, processor); processor.start(whiteboard); } else { @@ -216,6 +222,8 @@ boolean noLocal = filter.getNoLocal(); boolean noExternal = filter.getNoExternal() || listener instanceof ExcludeExternal; boolean noInternal = filter.getNoInternal(); + boolean consolidate = false; // filter.getConsolidateChanges(); + int queueLen = consolidate ? 1 : queueLength; Set includePaths = getOakPaths(namePathMapper, filter.getAdditionalPaths()); String absPath = filter.getAbsPath(); if (absPath != null) { @@ -256,7 +264,7 @@ ListenerTracker tracker = new WarningListenerTracker( !noExternal, listener, eventTypes, absPath, isDeep, uuids, nodeTypeName, noLocal); - addEventListener(listener, tracker, filterBuilder.build()); + addEventListener(listener, tracker, filterBuilder.build(), queueLen); } private static List createExclusions(FilterBuilder filterBuilder, Iterable excludedPaths) {