Index: oak-jcr/pom.xml
===================================================================
--- oak-jcr/pom.xml (revision 1761407)
+++ oak-jcr/pom.xml (working copy)
@@ -268,7 +268,7 @@
org.apache.jackrabbit
jackrabbit-api
- ${jackrabbit.version}
+ 2.13.4-SNAPSHOT
org.apache.jackrabbit
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (revision 1761407)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java (working copy)
@@ -233,11 +233,11 @@
if (queueSize == queueLength) {
if (commitRateLimiter != null) {
if (!blocking) {
- LOG.warn("Revision queue is full. Further commits will be blocked.");
+ onQueueFull("Revision queue is full. Further commits will be blocked.");
}
commitRateLimiter.blockCommits();
} else if (!blocking) {
- LOG.warn("Revision queue is full. Further revisions will be compacted.");
+ onQueueFull("Revision queue is full. Further revisions will be compacted.");
}
blocking = true;
} else {
@@ -282,6 +282,17 @@
};
}
+ /**
+ * Called when the queue is full. This implementation will log the message
+ * at WARN level.
+ *
+ * @param message a reason message.
+ * @param arguments arguments for the message.
+ */
+ protected void onQueueFull(String message, Object... arguments) {
+ LOG.warn(message, arguments);
+ }
+
private final Monitor runningMonitor = new Monitor();
private final RunningGuard running = new RunningGuard(runningMonitor);
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedChangeProcessor.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedChangeProcessor.java (nonexistent)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedChangeProcessor.java (working copy)
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.jcr.observation;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.jackrabbit.api.observation.ConsolidatedChanges;
+import org.apache.jackrabbit.commons.observation.ListenerTracker;
+import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.namepath.NamePathMapper;
+import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.StatisticManager;
+
+/**
+ * A change processor for listeners that implement {@link ConsolidatedChanges}.
+ */
+class ConsolidatedChangeProcessor extends ChangeProcessor {
+
+ public ConsolidatedChangeProcessor(ContentSession contentSession,
+ NamePathMapper namePathMapper,
+ ListenerTracker tracker,
+ FilterProvider filter,
+ StatisticManager statisticManager) {
+ super(contentSession, namePathMapper, tracker,
+ filter, statisticManager, 1, null);
+ }
+
+ @Override
+ public void contentChanged(@Nonnull NodeState root,
+ @Nullable CommitInfo info) {
+ // do not pass commit info and allow change processor
+ // to consolidate even local changes
+ super.contentChanged(root, null);
+ }
+
+ @Override
+ protected void onQueueFull(String message, Object... arguments) {
+ // do nothing
+ }
+}
Property changes on: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ConsolidatedChangeProcessor.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (revision 1761407)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java (working copy)
@@ -41,6 +41,7 @@
import javax.jcr.observation.EventListener;
import javax.jcr.observation.EventListenerIterator;
+import org.apache.jackrabbit.api.observation.ConsolidatedChanges;
import org.apache.jackrabbit.api.observation.JackrabbitEventFilter;
import org.apache.jackrabbit.api.observation.JackrabbitObservationManager;
import org.apache.jackrabbit.commons.iterator.EventListenerIteratorAdapter;
@@ -145,12 +146,7 @@
if (processor == null) {
LOG.debug(OBSERVATION,
"Registering event listener {} with filter {}", listener, filterProvider);
- // TODO sharing the namePathMapper across different thread might lead to lock contention.
- // If this turns out to be problematic we might create a dedicated snapshot for each
- // session. See OAK-1368.
- processor = new ChangeProcessor(sessionDelegate.getContentSession(), namePathMapper,
- tracker, filterProvider, statisticManager, queueLength,
- commitRateLimiter);
+ processor = createChangeProcessor(listener, tracker, filterProvider);
processors.put(listener, processor);
processor.start(whiteboard);
} else {
@@ -160,6 +156,23 @@
}
}
+ private ChangeProcessor createChangeProcessor(EventListener listener,
+ ListenerTracker tracker,
+ FilterProvider filterProvider) {
+ // TODO sharing the namePathMapper across different thread might lead to lock contention.
+ // If this turns out to be problematic we might create a dedicated snapshot for each
+ // session. See OAK-1368.
+ if (listener instanceof ConsolidatedChanges) {
+ return new ConsolidatedChangeProcessor(
+ sessionDelegate.getContentSession(), namePathMapper,
+ tracker, filterProvider, statisticManager);
+ } else {
+ return new ChangeProcessor(sessionDelegate.getContentSession(),
+ namePathMapper, tracker, filterProvider, statisticManager,
+ queueLength, commitRateLimiter);
+ }
+ }
+
/**
* Adds an event listener that listens for the events specified
* by the {@code filterProvider} passed to this method.