Index: src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (revision 441748)
+++ src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (working copy)
@@ -37,7 +37,7 @@
import org.apache.jackrabbit.core.nodetype.NodeTypeRegistry;
import org.apache.jackrabbit.core.nodetype.virtual.VirtualNodeTypeStateManager;
import org.apache.jackrabbit.core.observation.DelegatingObservationDispatcher;
-import org.apache.jackrabbit.core.observation.ObservationManagerFactory;
+import org.apache.jackrabbit.core.observation.ObservationDispatcher;
import org.apache.jackrabbit.core.security.AuthContext;
import org.apache.jackrabbit.core.state.ItemStateException;
import org.apache.jackrabbit.core.state.PMContext;
@@ -712,12 +712,12 @@
return getWorkspaceInfo(workspaceName).getItemStateProvider();
}
- ObservationManagerFactory getObservationManagerFactory(String workspaceName)
+ ObservationDispatcher getObservationDispatcher(String workspaceName)
throws NoSuchWorkspaceException {
// check sanity of this instance
sanityCheck();
- return getWorkspaceInfo(workspaceName).getObservationManagerFactory();
+ return getWorkspaceInfo(workspaceName).getObservationDispatcher();
}
/**
@@ -1294,7 +1294,7 @@
/**
* observation manager factory (instantiated on init)
*/
- private ObservationManagerFactory obsMgrFactory;
+ private ObservationDispatcher dispatcher;
/**
* system session (lazily instantiated)
@@ -1457,13 +1457,13 @@
*
* @return the observation manager factory for this workspace
*/
- ObservationManagerFactory getObservationManagerFactory() {
+ ObservationDispatcher getObservationDispatcher() {
if (!isInitialized()) {
throw new IllegalStateException("workspace '" + getName()
+ "' not initialized");
}
- return obsMgrFactory;
+ return dispatcher;
}
/**
@@ -1623,10 +1623,10 @@
throw new RepositoryException(msg, ise);
}
- obsMgrFactory = new ObservationManagerFactory();
+ dispatcher = new ObservationDispatcher();
// register the observation factory of that workspace
- delegatingDispatcher.addDispatcher(obsMgrFactory);
+ delegatingDispatcher.addDispatcher(dispatcher);
initialized = true;
@@ -1691,11 +1691,11 @@
log.info("shutting down workspace '" + getName() + "'...");
// deregister the observation factory of that workspace
- delegatingDispatcher.removeDispatcher(obsMgrFactory);
+ delegatingDispatcher.removeDispatcher(dispatcher);
// dispose observation manager factory
- obsMgrFactory.dispose();
- obsMgrFactory = null;
+ dispatcher.dispose();
+ dispatcher = null;
// shutdown search managers
if (searchMgr != null) {
Index: src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java (revision 0)
+++ src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java (revision 0)
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.observation;
+
+import org.apache.commons.collections.Buffer;
+import org.apache.commons.collections.BufferUtils;
+import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
+import org.apache.jackrabbit.core.state.ChangeLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Dispatcher for dispatching events to listeners within a single workspace.
+ */
+public final class ObservationDispatcher extends EventDispatcher
+ implements Runnable {
+
+ /**
+ * Logger instance for this class
+ */
+ private static final Logger log
+ = LoggerFactory.getLogger(ObservationDispatcher.class);
+
+ /**
+ * Dummy DispatchAction indicating the notification thread to end
+ */
+ private static final DispatchAction DISPOSE_MARKER = new DispatchAction(null, null);
+
+ /**
+ * Currently active EventConsumers for notification.
+ */
+ private Set activeConsumers = new HashSet();
+
+ /**
+ * Currently active synchronous EventConsumers for notification.
+ */
+ private Set synchronousConsumers = new HashSet();
+
+ /**
+ * Set of EventConsumers for read only Set access
+ */
+ private Set readOnlyConsumers;
+
+ /**
+ * Set of synchronous EventConsumers for read only Set access.
+ */
+ private Set synchronousReadOnlyConsumers;
+
+ /**
+ * synchronization monitor for listener changes
+ */
+ private Object consumerChange = new Object();
+
+ /**
+ * Contains the pending events that will be delivered to event listeners
+ */
+ private Buffer eventQueue
+ = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
+
+ /**
+ * The background notification thread
+ */
+ private Thread notificationThread;
+
+ /**
+ * Creates a new ObservationDispatcher instance
+ * and starts the notification thread deamon.
+ */
+ public ObservationDispatcher() {
+ notificationThread = new Thread(this, "ObservationManager");
+ notificationThread.setDaemon(true);
+ notificationThread.start();
+ }
+
+ /**
+ * Disposes this ObservationManager. This will
+ * effectively stop the background notification thread.
+ */
+ public void dispose() {
+ // dispatch dummy event to mark end of notification
+ eventQueue.add(DISPOSE_MARKER);
+ try {
+ notificationThread.join();
+ } catch (InterruptedException e) {
+ // FIXME log exception ?
+ }
+ log.info("Notification of EventListeners stopped.");
+ }
+
+ /**
+ * Returns an unmodifieable Set of EventConsumers.
+ *
+ * @return Set of EventConsumers.
+ */
+ Set getAsynchronousConsumers() {
+ synchronized (consumerChange) {
+ if (readOnlyConsumers == null) {
+ readOnlyConsumers = Collections.unmodifiableSet(new HashSet(activeConsumers));
+ }
+ return readOnlyConsumers;
+ }
+ }
+
+ Set getSynchronousConsumers() {
+ synchronized (consumerChange) {
+ if (synchronousReadOnlyConsumers == null) {
+ synchronousReadOnlyConsumers = Collections.unmodifiableSet(new HashSet(synchronousConsumers));
+ }
+ return synchronousReadOnlyConsumers;
+ }
+ }
+
+ /**
+ * Implements the run method of the background notification
+ * thread.
+ */
+ public void run() {
+ DispatchAction action;
+ while ((action = (DispatchAction) eventQueue.remove()) != DISPOSE_MARKER) {
+
+ log.debug("got EventStateCollection");
+ log.debug("event delivery to " + action.getEventConsumers().size() + " consumers started...");
+ for (Iterator it = action.getEventConsumers().iterator(); it.hasNext();) {
+ EventConsumer c = (EventConsumer) it.next();
+ try {
+ c.consumeEvents(action.getEventStates());
+ } catch (Throwable t) {
+ log.warn("EventConsumer threw exception: " + t.toString());
+ log.debug("Stacktrace: ", t);
+ // move on to the next consumer
+ }
+ }
+ log.debug("event delivery finished.");
+
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
EventConsumer to add or replace.
+ */
+ void addConsumer(EventConsumer consumer) {
+ synchronized (consumerChange) {
+ if (consumer.getEventListener() instanceof SynchronousEventListener) {
+ // remove existing if any
+ synchronousConsumers.remove(consumer);
+ // re-add it
+ synchronousConsumers.add(consumer);
+ // reset read only consumer set
+ synchronousReadOnlyConsumers = null;
+ } else {
+ // remove existing if any
+ activeConsumers.remove(consumer);
+ // re-add it
+ activeConsumers.add(consumer);
+ // reset read only consumer set
+ readOnlyConsumers = null;
+ }
+ }
+ }
+
+ /**
+ * Unregisters an event consumer from event notification.
+ * @param consumer the consumer to deregister.
+ */
+ void removeConsumer(EventConsumer consumer) {
+ synchronized (consumerChange) {
+ if (consumer.getEventListener() instanceof SynchronousEventListener) {
+ synchronousConsumers.remove(consumer);
+ // reset read only listener set
+ synchronousReadOnlyConsumers = null;
+ } else {
+ activeConsumers.remove(consumer);
+ // reset read only listener set
+ readOnlyConsumers = null;
+ }
+ }
+ }
+
+}
Property changes on: src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java
___________________________________________________________________
Name: svn:eol-style
+ native
Index: src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java (revision 441748)
+++ src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java (working copy)
@@ -53,9 +53,9 @@
private final ItemManager itemMgr;
/**
- * The ObservationManagerFactory
+ * The observation dispatcher
*/
- private final ObservationManagerFactory obsMgrFactory;
+ private final ObservationDispatcher dispatcher;
static {
// preload EventListenerIteratorImpl to prevent classloader issues during shutdown
@@ -72,9 +72,9 @@
* @throws NullPointerException if session or itemMgr
* is null.
*/
- ObservationManagerImpl(ObservationManagerFactory obsMgrFactory,
- SessionImpl session,
- ItemManager itemMgr) throws NullPointerException {
+ public ObservationManagerImpl(
+ ObservationDispatcher dispatcher, SessionImpl session,
+ ItemManager itemMgr) throws NullPointerException {
if (session == null) {
throw new NullPointerException("session");
}
@@ -82,7 +82,7 @@
throw new NullPointerException("itemMgr");
}
- this.obsMgrFactory = obsMgrFactory;
+ this.dispatcher = dispatcher;
this.session = session;
this.itemMgr = itemMgr;
}
@@ -138,7 +138,7 @@
EventConsumer consumer =
new EventConsumer(session, listener, filter);
- obsMgrFactory.addConsumer(consumer);
+ dispatcher.addConsumer(consumer);
}
/**
@@ -148,7 +148,7 @@
throws RepositoryException {
EventConsumer consumer =
new EventConsumer(session, listener, EventFilter.BLOCK_ALL);
- obsMgrFactory.removeConsumer(consumer);
+ dispatcher.removeConsumer(consumer);
}
@@ -158,8 +158,8 @@
public EventListenerIterator getRegisteredEventListeners()
throws RepositoryException {
return new EventListenerIteratorImpl(session,
- obsMgrFactory.getSynchronousConsumers(),
- obsMgrFactory.getAsynchronousConsumers());
+ dispatcher.getSynchronousConsumers(),
+ dispatcher.getAsynchronousConsumers());
}
/**
@@ -188,6 +188,6 @@
* which is attached to this ObservationManager instance.
*/
public EventStateCollection createEventStateCollection() {
- return new EventStateCollection(obsMgrFactory, session, null);
+ return new EventStateCollection(dispatcher, session, null);
}
}
Index: src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerFactory.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerFactory.java (revision 441748)
+++ src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerFactory.java (working copy)
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.core.observation;
-
-import org.apache.commons.collections.Buffer;
-import org.apache.commons.collections.BufferUtils;
-import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
-import org.apache.jackrabbit.core.ItemManager;
-import org.apache.jackrabbit.core.SessionImpl;
-import org.apache.jackrabbit.core.state.ChangeLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-/**
- * The class ObservationManagerFactory creates new
- * ObservationManager instances for sessions. It also
- * creates new {@link EventStateCollection}s that can be dispatched
- * to registered {@link javax.jcr.observation.EventListener}s.
- */
-public final class ObservationManagerFactory extends EventDispatcher
- implements Runnable {
-
- /**
- * Logger instance for this class
- */
- private static final Logger log
- = LoggerFactory.getLogger(ObservationManagerFactory.class);
-
- /**
- * Dummy DispatchAction indicating the notification thread to end
- */
- private static final DispatchAction DISPOSE_MARKER = new DispatchAction(null, null);
-
- /**
- * Currently active EventConsumers for notification.
- */
- private Set activeConsumers = new HashSet();
-
- /**
- * Currently active synchronous EventConsumers for notification.
- */
- private Set synchronousConsumers = new HashSet();
-
- /**
- * Set of EventConsumers for read only Set access
- */
- private Set readOnlyConsumers;
-
- /**
- * Set of synchronous EventConsumers for read only Set access.
- */
- private Set synchronousReadOnlyConsumers;
-
- /**
- * synchronization monitor for listener changes
- */
- private Object consumerChange = new Object();
-
- /**
- * Contains the pending events that will be delivered to event listeners
- */
- private Buffer eventQueue
- = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
-
- /**
- * The background notification thread
- */
- private Thread notificationThread;
-
- /**
- * Creates a new ObservationManagerFactory instance
- * and starts the notification thread deamon.
- */
- public ObservationManagerFactory() {
- notificationThread = new Thread(this, "ObservationManager");
- notificationThread.setDaemon(true);
- notificationThread.start();
- }
-
- /**
- * Disposes this ObservationManager. This will
- * effectively stop the background notification thread.
- */
- public void dispose() {
- // dispatch dummy event to mark end of notification
- eventQueue.add(DISPOSE_MARKER);
- try {
- notificationThread.join();
- } catch (InterruptedException e) {
- // FIXME log exception ?
- }
- log.info("Notification of EventListeners stopped.");
- }
-
- /**
- * Returns an unmodifieable Set of EventConsumers.
- *
- * @return Set of EventConsumers.
- */
- Set getAsynchronousConsumers() {
- synchronized (consumerChange) {
- if (readOnlyConsumers == null) {
- readOnlyConsumers = Collections.unmodifiableSet(new HashSet(activeConsumers));
- }
- return readOnlyConsumers;
- }
- }
-
- Set getSynchronousConsumers() {
- synchronized (consumerChange) {
- if (synchronousReadOnlyConsumers == null) {
- synchronousReadOnlyConsumers = Collections.unmodifiableSet(new HashSet(synchronousConsumers));
- }
- return synchronousReadOnlyConsumers;
- }
- }
-
- /**
- * Creates a new session local ObservationManager
- * with an associated NamespaceResolver.
- *
- * @param session the session.
- * @param itemMgr the ItemManager of the session.
- * @return an ObservationManager.
- */
- public ObservationManagerImpl createObservationManager(SessionImpl session,
- ItemManager itemMgr) {
- return new ObservationManagerImpl(this, session, itemMgr);
- }
-
- /**
- * Implements the run method of the background notification
- * thread.
- */
- public void run() {
- DispatchAction action;
- while ((action = (DispatchAction) eventQueue.remove()) != DISPOSE_MARKER) {
-
- log.debug("got EventStateCollection");
- log.debug("event delivery to " + action.getEventConsumers().size() + " consumers started...");
- for (Iterator it = action.getEventConsumers().iterator(); it.hasNext();) {
- EventConsumer c = (EventConsumer) it.next();
- try {
- c.consumeEvents(action.getEventStates());
- } catch (Throwable t) {
- log.warn("EventConsumer threw exception: " + t.toString());
- log.debug("Stacktrace: ", t);
- // move on to the next consumer
- }
- }
- log.debug("event delivery finished.");
-
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * Gives this observation manager the oportunity to
- * prepare the events for dispatching.
- */
- void prepareEvents(EventStateCollection events) {
- Set consumers = new HashSet();
- consumers.addAll(getSynchronousConsumers());
- consumers.addAll(getAsynchronousConsumers());
- for (Iterator it = consumers.iterator(); it.hasNext();) {
- EventConsumer c = (EventConsumer) it.next();
- c.prepareEvents(events);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- void prepareDeleted(EventStateCollection events, ChangeLog changes) {
- Set consumers = new HashSet();
- consumers.addAll(getSynchronousConsumers());
- consumers.addAll(getAsynchronousConsumers());
- for (Iterator it = consumers.iterator(); it.hasNext();) {
- EventConsumer c = (EventConsumer) it.next();
- c.prepareDeleted(events, changes.deletedStates());
- }
- }
-
- /**
- * {@inheritDoc}
- *
- * Dispatches the {@link EventStateCollection events} to all
- * registered {@link javax.jcr.observation.EventListener}s.
- */
- void dispatchEvents(EventStateCollection events) {
- // notify synchronous listeners
- Set synchronous = getSynchronousConsumers();
- if (log.isDebugEnabled()) {
- log.debug("notifying " + synchronous.size() + " synchronous listeners.");
- }
- for (Iterator it = synchronous.iterator(); it.hasNext();) {
- EventConsumer c = (EventConsumer) it.next();
- try {
- c.consumeEvents(events);
- } catch (Throwable t) {
- log.error("Synchronous EventConsumer threw exception.", t);
- // move on to next consumer
- }
- }
- eventQueue.add(new DispatchAction(events, getAsynchronousConsumers()));
- }
-
- /**
- * Adds or replaces an event consumer.
- * @param consumer the EventConsumer to add or replace.
- */
- void addConsumer(EventConsumer consumer) {
- synchronized (consumerChange) {
- if (consumer.getEventListener() instanceof SynchronousEventListener) {
- // remove existing if any
- synchronousConsumers.remove(consumer);
- // re-add it
- synchronousConsumers.add(consumer);
- // reset read only consumer set
- synchronousReadOnlyConsumers = null;
- } else {
- // remove existing if any
- activeConsumers.remove(consumer);
- // re-add it
- activeConsumers.add(consumer);
- // reset read only consumer set
- readOnlyConsumers = null;
- }
- }
- }
-
- /**
- * Unregisters an event consumer from event notification.
- * @param consumer the consumer to deregister.
- */
- void removeConsumer(EventConsumer consumer) {
- synchronized (consumerChange) {
- if (consumer.getEventListener() instanceof SynchronousEventListener) {
- synchronousConsumers.remove(consumer);
- // reset read only listener set
- synchronousReadOnlyConsumers = null;
- } else {
- activeConsumers.remove(consumer);
- // reset read only listener set
- readOnlyConsumers = null;
- }
- }
- }
-
-}
Index: src/main/java/org/apache/jackrabbit/core/observation/DelegatingObservationDispatcher.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/observation/DelegatingObservationDispatcher.java (revision 441748)
+++ src/main/java/org/apache/jackrabbit/core/observation/DelegatingObservationDispatcher.java (working copy)
@@ -46,7 +46,7 @@
*
* @param disp
*/
- public void addDispatcher(ObservationManagerFactory disp) {
+ public void addDispatcher(ObservationDispatcher disp) {
synchronized (dispatchers) {
dispatchers.add(disp);
}
@@ -57,7 +57,7 @@
*
* @param disp
*/
- public void removeDispatcher(ObservationManagerFactory disp) {
+ public void removeDispatcher(ObservationDispatcher disp) {
synchronized (dispatchers) {
dispatchers.remove(disp);
}
@@ -107,10 +107,10 @@
* @param session
*/
public void dispatch(List eventList, SessionImpl session, Path pathPrefix) {
- ObservationManagerFactory[] disp;
+ ObservationDispatcher[] disp;
synchronized (dispatchers) {
- disp = (ObservationManagerFactory[]) dispatchers.toArray(
- new ObservationManagerFactory[dispatchers.size()]);
+ disp = (ObservationDispatcher[]) dispatchers.toArray(
+ new ObservationDispatcher[dispatchers.size()]);
}
for (int i=0; i< disp.length; i++) {
EventStateCollection events =
Index: src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java
===================================================================
--- src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java (revision 441748)
+++ src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java (working copy)
@@ -21,7 +21,7 @@
import org.apache.jackrabbit.core.lock.LockManager;
import org.apache.jackrabbit.core.observation.EventStateCollection;
import org.apache.jackrabbit.core.observation.EventStateCollectionFactory;
-import org.apache.jackrabbit.core.observation.ObservationManagerFactory;
+import org.apache.jackrabbit.core.observation.ObservationDispatcher;
import org.apache.jackrabbit.core.observation.ObservationManagerImpl;
import org.apache.jackrabbit.core.query.QueryManagerImpl;
import org.apache.jackrabbit.core.state.LocalItemStateManager;
@@ -532,18 +532,21 @@
}
/**
- * {@inheritDoc}
+ * Returns the observation manager of this session. The observation manager
+ * is lazily created if it does not exist yet.
+ *
+ * @return the observation manager of this session
+ * @throws RepositoryException if a repository error occurs
*/
- public ObservationManager getObservationManager()
- throws UnsupportedRepositoryOperationException, RepositoryException {
+ public ObservationManager getObservationManager() throws RepositoryException {
// check state of this instance
sanityCheck();
if (obsMgr == null) {
try {
- ObservationManagerFactory factory =
- rep.getObservationManagerFactory(wspConfig.getName());
- obsMgr = factory.createObservationManager(session, session.getItemManager());
+ ObservationDispatcher factory =
+ rep.getObservationDispatcher(wspConfig.getName());
+ obsMgr = new ObservationManagerImpl(factory, session, session.getItemManager());
} catch (NoSuchWorkspaceException nswe) {
// should never get here
String msg = "internal error: failed to instantiate observation manager";