Index: oak-jcr/pom.xml =================================================================== --- oak-jcr/pom.xml (revision 1761407) +++ oak-jcr/pom.xml (working copy) @@ -268,7 +268,7 @@ org.apache.jackrabbit jackrabbit-api - ${jackrabbit.version} + 2.13.4-SNAPSHOT org.apache.jackrabbit Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java =================================================================== --- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (revision 1761407) +++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (working copy) @@ -233,11 +233,11 @@ if (queueSize == queueLength) { if (commitRateLimiter != null) { if (!blocking) { - LOG.warn("Revision queue is full. Further commits will be blocked."); + onQueueFull("Revision queue is full. Further commits will be blocked."); } commitRateLimiter.blockCommits(); } else if (!blocking) { - LOG.warn("Revision queue is full. Further revisions will be compacted."); + onQueueFull("Revision queue is full. Further revisions will be compacted."); } blocking = true; } else { @@ -282,6 +282,17 @@ }; } + /** + * Called when the queue is full. This implementation will log the message + * at WARN level. + * + * @param message a reason message. + * @param arguments arguments for the message. + */ + protected void onQueueFull(String message, Object... arguments) { + LOG.warn(message, arguments); + } + private final Monitor runningMonitor = new Monitor(); private final RunningGuard running = new RunningGuard(runningMonitor); Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedChangeProcessor.java =================================================================== --- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedChangeProcessor.java (nonexistent) +++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedChangeProcessor.java (working copy) @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.jcr.observation; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.apache.jackrabbit.api.observation.ConsolidatedChanges; +import org.apache.jackrabbit.commons.observation.ListenerTracker; +import org.apache.jackrabbit.oak.api.ContentSession; +import org.apache.jackrabbit.oak.namepath.NamePathMapper; +import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.apache.jackrabbit.oak.stats.StatisticManager; + +/** + * A change processor for listeners that implement {@link ConsolidatedChanges}. + */ +class ConsolidatedChangeProcessor extends ChangeProcessor { + + public ConsolidatedChangeProcessor(ContentSession contentSession, + NamePathMapper namePathMapper, + ListenerTracker tracker, + FilterProvider filter, + StatisticManager statisticManager) { + super(contentSession, namePathMapper, tracker, + filter, statisticManager, 1, null); + } + + @Override + public void contentChanged(@Nonnull NodeState root, + @Nullable CommitInfo info) { + // do not pass commit info and allow change processor + // to consolidate even local changes + super.contentChanged(root, null); + } + + @Override + protected void onQueueFull(String message, Object... arguments) { + // do nothing + } +} Property changes on: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedChangeProcessor.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java =================================================================== --- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (revision 1761407) +++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (working copy) @@ -41,6 +41,7 @@ import javax.jcr.observation.EventListener; import javax.jcr.observation.EventListenerIterator; +import org.apache.jackrabbit.api.observation.ConsolidatedChanges; import org.apache.jackrabbit.api.observation.JackrabbitEventFilter; import org.apache.jackrabbit.api.observation.JackrabbitObservationManager; import org.apache.jackrabbit.commons.iterator.EventListenerIteratorAdapter; @@ -145,12 +146,7 @@ if (processor == null) { LOG.debug(OBSERVATION, "Registering event listener {} with filter {}", listener, filterProvider); - // TODO sharing the namePathMapper across different thread might lead to lock contention. - // If this turns out to be problematic we might create a dedicated snapshot for each - // session. See OAK-1368. - processor = new ChangeProcessor(sessionDelegate.getContentSession(), namePathMapper, - tracker, filterProvider, statisticManager, queueLength, - commitRateLimiter); + processor = createChangeProcessor(listener, tracker, filterProvider); processors.put(listener, processor); processor.start(whiteboard); } else { @@ -160,6 +156,23 @@ } } + private ChangeProcessor createChangeProcessor(EventListener listener, + ListenerTracker tracker, + FilterProvider filterProvider) { + // TODO sharing the namePathMapper across different thread might lead to lock contention. + // If this turns out to be problematic we might create a dedicated snapshot for each + // session. See OAK-1368. + if (listener instanceof ConsolidatedChanges) { + return new ConsolidatedChangeProcessor( + sessionDelegate.getContentSession(), namePathMapper, + tracker, filterProvider, statisticManager); + } else { + return new ChangeProcessor(sessionDelegate.getContentSession(), + namePathMapper, tracker, filterProvider, statisticManager, + queueLength, commitRateLimiter); + } + } + /** * Adds an event listener that listens for the events specified * by the {@code filterProvider} passed to this method.