Index: src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (revision 441748) +++ src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (working copy) @@ -37,7 +37,7 @@ import org.apache.jackrabbit.core.nodetype.NodeTypeRegistry; import org.apache.jackrabbit.core.nodetype.virtual.VirtualNodeTypeStateManager; import org.apache.jackrabbit.core.observation.DelegatingObservationDispatcher; -import org.apache.jackrabbit.core.observation.ObservationManagerFactory; +import org.apache.jackrabbit.core.observation.ObservationDispatcher; import org.apache.jackrabbit.core.security.AuthContext; import org.apache.jackrabbit.core.state.ItemStateException; import org.apache.jackrabbit.core.state.PMContext; @@ -712,12 +712,12 @@ return getWorkspaceInfo(workspaceName).getItemStateProvider(); } - ObservationManagerFactory getObservationManagerFactory(String workspaceName) + ObservationDispatcher getObservationDispatcher(String workspaceName) throws NoSuchWorkspaceException { // check sanity of this instance sanityCheck(); - return getWorkspaceInfo(workspaceName).getObservationManagerFactory(); + return getWorkspaceInfo(workspaceName).getObservationDispatcher(); } /** @@ -1294,7 +1294,7 @@ /** * observation manager factory (instantiated on init) */ - private ObservationManagerFactory obsMgrFactory; + private ObservationDispatcher dispatcher; /** * system session (lazily instantiated) @@ -1457,13 +1457,13 @@ * * @return the observation manager factory for this workspace */ - ObservationManagerFactory getObservationManagerFactory() { + ObservationDispatcher getObservationDispatcher() { if (!isInitialized()) { throw new IllegalStateException("workspace '" + getName() + "' not initialized"); } - return obsMgrFactory; + return dispatcher; } /** @@ -1623,10 +1623,10 @@ throw new RepositoryException(msg, ise); } - obsMgrFactory = new ObservationManagerFactory(); + dispatcher = new ObservationDispatcher(); // register the observation factory of that workspace - delegatingDispatcher.addDispatcher(obsMgrFactory); + delegatingDispatcher.addDispatcher(dispatcher); initialized = true; @@ -1691,11 +1691,11 @@ log.info("shutting down workspace '" + getName() + "'..."); // deregister the observation factory of that workspace - delegatingDispatcher.removeDispatcher(obsMgrFactory); + delegatingDispatcher.removeDispatcher(dispatcher); // dispose observation manager factory - obsMgrFactory.dispose(); - obsMgrFactory = null; + dispatcher.dispose(); + dispatcher = null; // shutdown search managers if (searchMgr != null) { Index: src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java (revision 0) @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.observation; + +import org.apache.commons.collections.Buffer; +import org.apache.commons.collections.BufferUtils; +import org.apache.commons.collections.buffer.UnboundedFifoBuffer; +import org.apache.jackrabbit.core.state.ChangeLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +/** + * Dispatcher for dispatching events to listeners within a single workspace. + */ +public final class ObservationDispatcher extends EventDispatcher + implements Runnable { + + /** + * Logger instance for this class + */ + private static final Logger log + = LoggerFactory.getLogger(ObservationDispatcher.class); + + /** + * Dummy DispatchAction indicating the notification thread to end + */ + private static final DispatchAction DISPOSE_MARKER = new DispatchAction(null, null); + + /** + * Currently active EventConsumers for notification. + */ + private Set activeConsumers = new HashSet(); + + /** + * Currently active synchronous EventConsumers for notification. + */ + private Set synchronousConsumers = new HashSet(); + + /** + * Set of EventConsumers for read only Set access + */ + private Set readOnlyConsumers; + + /** + * Set of synchronous EventConsumers for read only Set access. + */ + private Set synchronousReadOnlyConsumers; + + /** + * synchronization monitor for listener changes + */ + private Object consumerChange = new Object(); + + /** + * Contains the pending events that will be delivered to event listeners + */ + private Buffer eventQueue + = BufferUtils.blockingBuffer(new UnboundedFifoBuffer()); + + /** + * The background notification thread + */ + private Thread notificationThread; + + /** + * Creates a new ObservationDispatcher instance + * and starts the notification thread deamon. + */ + public ObservationDispatcher() { + notificationThread = new Thread(this, "ObservationManager"); + notificationThread.setDaemon(true); + notificationThread.start(); + } + + /** + * Disposes this ObservationManager. This will + * effectively stop the background notification thread. + */ + public void dispose() { + // dispatch dummy event to mark end of notification + eventQueue.add(DISPOSE_MARKER); + try { + notificationThread.join(); + } catch (InterruptedException e) { + // FIXME log exception ? + } + log.info("Notification of EventListeners stopped."); + } + + /** + * Returns an unmodifieable Set of EventConsumers. + * + * @return Set of EventConsumers. + */ + Set getAsynchronousConsumers() { + synchronized (consumerChange) { + if (readOnlyConsumers == null) { + readOnlyConsumers = Collections.unmodifiableSet(new HashSet(activeConsumers)); + } + return readOnlyConsumers; + } + } + + Set getSynchronousConsumers() { + synchronized (consumerChange) { + if (synchronousReadOnlyConsumers == null) { + synchronousReadOnlyConsumers = Collections.unmodifiableSet(new HashSet(synchronousConsumers)); + } + return synchronousReadOnlyConsumers; + } + } + + /** + * Implements the run method of the background notification + * thread. + */ + public void run() { + DispatchAction action; + while ((action = (DispatchAction) eventQueue.remove()) != DISPOSE_MARKER) { + + log.debug("got EventStateCollection"); + log.debug("event delivery to " + action.getEventConsumers().size() + " consumers started..."); + for (Iterator it = action.getEventConsumers().iterator(); it.hasNext();) { + EventConsumer c = (EventConsumer) it.next(); + try { + c.consumeEvents(action.getEventStates()); + } catch (Throwable t) { + log.warn("EventConsumer threw exception: " + t.toString()); + log.debug("Stacktrace: ", t); + // move on to the next consumer + } + } + log.debug("event delivery finished."); + + } + } + + /** + * {@inheritDoc} + *

+ * Gives this observation manager the oportunity to + * prepare the events for dispatching. + */ + void prepareEvents(EventStateCollection events) { + Set consumers = new HashSet(); + consumers.addAll(getSynchronousConsumers()); + consumers.addAll(getAsynchronousConsumers()); + for (Iterator it = consumers.iterator(); it.hasNext();) { + EventConsumer c = (EventConsumer) it.next(); + c.prepareEvents(events); + } + } + + /** + * {@inheritDoc} + */ + void prepareDeleted(EventStateCollection events, ChangeLog changes) { + Set consumers = new HashSet(); + consumers.addAll(getSynchronousConsumers()); + consumers.addAll(getAsynchronousConsumers()); + for (Iterator it = consumers.iterator(); it.hasNext();) { + EventConsumer c = (EventConsumer) it.next(); + c.prepareDeleted(events, changes.deletedStates()); + } + } + + /** + * {@inheritDoc} + *

+ * Dispatches the {@link EventStateCollection events} to all + * registered {@link javax.jcr.observation.EventListener}s. + */ + void dispatchEvents(EventStateCollection events) { + // notify synchronous listeners + Set synchronous = getSynchronousConsumers(); + if (log.isDebugEnabled()) { + log.debug("notifying " + synchronous.size() + " synchronous listeners."); + } + for (Iterator it = synchronous.iterator(); it.hasNext();) { + EventConsumer c = (EventConsumer) it.next(); + try { + c.consumeEvents(events); + } catch (Throwable t) { + log.error("Synchronous EventConsumer threw exception.", t); + // move on to next consumer + } + } + eventQueue.add(new DispatchAction(events, getAsynchronousConsumers())); + } + + /** + * Adds or replaces an event consumer. + * @param consumer the EventConsumer to add or replace. + */ + void addConsumer(EventConsumer consumer) { + synchronized (consumerChange) { + if (consumer.getEventListener() instanceof SynchronousEventListener) { + // remove existing if any + synchronousConsumers.remove(consumer); + // re-add it + synchronousConsumers.add(consumer); + // reset read only consumer set + synchronousReadOnlyConsumers = null; + } else { + // remove existing if any + activeConsumers.remove(consumer); + // re-add it + activeConsumers.add(consumer); + // reset read only consumer set + readOnlyConsumers = null; + } + } + } + + /** + * Unregisters an event consumer from event notification. + * @param consumer the consumer to deregister. + */ + void removeConsumer(EventConsumer consumer) { + synchronized (consumerChange) { + if (consumer.getEventListener() instanceof SynchronousEventListener) { + synchronousConsumers.remove(consumer); + // reset read only listener set + synchronousReadOnlyConsumers = null; + } else { + activeConsumers.remove(consumer); + // reset read only listener set + readOnlyConsumers = null; + } + } + } + +} Property changes on: src/main/java/org/apache/jackrabbit/core/observation/ObservationDispatcher.java ___________________________________________________________________ Name: svn:eol-style + native Index: src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java (revision 441748) +++ src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java (working copy) @@ -53,9 +53,9 @@ private final ItemManager itemMgr; /** - * The ObservationManagerFactory + * The observation dispatcher */ - private final ObservationManagerFactory obsMgrFactory; + private final ObservationDispatcher dispatcher; static { // preload EventListenerIteratorImpl to prevent classloader issues during shutdown @@ -72,9 +72,9 @@ * @throws NullPointerException if session or itemMgr * is null. */ - ObservationManagerImpl(ObservationManagerFactory obsMgrFactory, - SessionImpl session, - ItemManager itemMgr) throws NullPointerException { + public ObservationManagerImpl( + ObservationDispatcher dispatcher, SessionImpl session, + ItemManager itemMgr) throws NullPointerException { if (session == null) { throw new NullPointerException("session"); } @@ -82,7 +82,7 @@ throw new NullPointerException("itemMgr"); } - this.obsMgrFactory = obsMgrFactory; + this.dispatcher = dispatcher; this.session = session; this.itemMgr = itemMgr; } @@ -138,7 +138,7 @@ EventConsumer consumer = new EventConsumer(session, listener, filter); - obsMgrFactory.addConsumer(consumer); + dispatcher.addConsumer(consumer); } /** @@ -148,7 +148,7 @@ throws RepositoryException { EventConsumer consumer = new EventConsumer(session, listener, EventFilter.BLOCK_ALL); - obsMgrFactory.removeConsumer(consumer); + dispatcher.removeConsumer(consumer); } @@ -158,8 +158,8 @@ public EventListenerIterator getRegisteredEventListeners() throws RepositoryException { return new EventListenerIteratorImpl(session, - obsMgrFactory.getSynchronousConsumers(), - obsMgrFactory.getAsynchronousConsumers()); + dispatcher.getSynchronousConsumers(), + dispatcher.getAsynchronousConsumers()); } /** @@ -188,6 +188,6 @@ * which is attached to this ObservationManager instance. */ public EventStateCollection createEventStateCollection() { - return new EventStateCollection(obsMgrFactory, session, null); + return new EventStateCollection(dispatcher, session, null); } } Index: src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerFactory.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerFactory.java (revision 441748) +++ src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerFactory.java (working copy) @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.jackrabbit.core.observation; - -import org.apache.commons.collections.Buffer; -import org.apache.commons.collections.BufferUtils; -import org.apache.commons.collections.buffer.UnboundedFifoBuffer; -import org.apache.jackrabbit.core.ItemManager; -import org.apache.jackrabbit.core.SessionImpl; -import org.apache.jackrabbit.core.state.ChangeLog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -/** - * The class ObservationManagerFactory creates new - * ObservationManager instances for sessions. It also - * creates new {@link EventStateCollection}s that can be dispatched - * to registered {@link javax.jcr.observation.EventListener}s. - */ -public final class ObservationManagerFactory extends EventDispatcher - implements Runnable { - - /** - * Logger instance for this class - */ - private static final Logger log - = LoggerFactory.getLogger(ObservationManagerFactory.class); - - /** - * Dummy DispatchAction indicating the notification thread to end - */ - private static final DispatchAction DISPOSE_MARKER = new DispatchAction(null, null); - - /** - * Currently active EventConsumers for notification. - */ - private Set activeConsumers = new HashSet(); - - /** - * Currently active synchronous EventConsumers for notification. - */ - private Set synchronousConsumers = new HashSet(); - - /** - * Set of EventConsumers for read only Set access - */ - private Set readOnlyConsumers; - - /** - * Set of synchronous EventConsumers for read only Set access. - */ - private Set synchronousReadOnlyConsumers; - - /** - * synchronization monitor for listener changes - */ - private Object consumerChange = new Object(); - - /** - * Contains the pending events that will be delivered to event listeners - */ - private Buffer eventQueue - = BufferUtils.blockingBuffer(new UnboundedFifoBuffer()); - - /** - * The background notification thread - */ - private Thread notificationThread; - - /** - * Creates a new ObservationManagerFactory instance - * and starts the notification thread deamon. - */ - public ObservationManagerFactory() { - notificationThread = new Thread(this, "ObservationManager"); - notificationThread.setDaemon(true); - notificationThread.start(); - } - - /** - * Disposes this ObservationManager. This will - * effectively stop the background notification thread. - */ - public void dispose() { - // dispatch dummy event to mark end of notification - eventQueue.add(DISPOSE_MARKER); - try { - notificationThread.join(); - } catch (InterruptedException e) { - // FIXME log exception ? - } - log.info("Notification of EventListeners stopped."); - } - - /** - * Returns an unmodifieable Set of EventConsumers. - * - * @return Set of EventConsumers. - */ - Set getAsynchronousConsumers() { - synchronized (consumerChange) { - if (readOnlyConsumers == null) { - readOnlyConsumers = Collections.unmodifiableSet(new HashSet(activeConsumers)); - } - return readOnlyConsumers; - } - } - - Set getSynchronousConsumers() { - synchronized (consumerChange) { - if (synchronousReadOnlyConsumers == null) { - synchronousReadOnlyConsumers = Collections.unmodifiableSet(new HashSet(synchronousConsumers)); - } - return synchronousReadOnlyConsumers; - } - } - - /** - * Creates a new session local ObservationManager - * with an associated NamespaceResolver. - * - * @param session the session. - * @param itemMgr the ItemManager of the session. - * @return an ObservationManager. - */ - public ObservationManagerImpl createObservationManager(SessionImpl session, - ItemManager itemMgr) { - return new ObservationManagerImpl(this, session, itemMgr); - } - - /** - * Implements the run method of the background notification - * thread. - */ - public void run() { - DispatchAction action; - while ((action = (DispatchAction) eventQueue.remove()) != DISPOSE_MARKER) { - - log.debug("got EventStateCollection"); - log.debug("event delivery to " + action.getEventConsumers().size() + " consumers started..."); - for (Iterator it = action.getEventConsumers().iterator(); it.hasNext();) { - EventConsumer c = (EventConsumer) it.next(); - try { - c.consumeEvents(action.getEventStates()); - } catch (Throwable t) { - log.warn("EventConsumer threw exception: " + t.toString()); - log.debug("Stacktrace: ", t); - // move on to the next consumer - } - } - log.debug("event delivery finished."); - - } - } - - /** - * {@inheritDoc} - *

- * Gives this observation manager the oportunity to - * prepare the events for dispatching. - */ - void prepareEvents(EventStateCollection events) { - Set consumers = new HashSet(); - consumers.addAll(getSynchronousConsumers()); - consumers.addAll(getAsynchronousConsumers()); - for (Iterator it = consumers.iterator(); it.hasNext();) { - EventConsumer c = (EventConsumer) it.next(); - c.prepareEvents(events); - } - } - - /** - * {@inheritDoc} - */ - void prepareDeleted(EventStateCollection events, ChangeLog changes) { - Set consumers = new HashSet(); - consumers.addAll(getSynchronousConsumers()); - consumers.addAll(getAsynchronousConsumers()); - for (Iterator it = consumers.iterator(); it.hasNext();) { - EventConsumer c = (EventConsumer) it.next(); - c.prepareDeleted(events, changes.deletedStates()); - } - } - - /** - * {@inheritDoc} - *

- * Dispatches the {@link EventStateCollection events} to all - * registered {@link javax.jcr.observation.EventListener}s. - */ - void dispatchEvents(EventStateCollection events) { - // notify synchronous listeners - Set synchronous = getSynchronousConsumers(); - if (log.isDebugEnabled()) { - log.debug("notifying " + synchronous.size() + " synchronous listeners."); - } - for (Iterator it = synchronous.iterator(); it.hasNext();) { - EventConsumer c = (EventConsumer) it.next(); - try { - c.consumeEvents(events); - } catch (Throwable t) { - log.error("Synchronous EventConsumer threw exception.", t); - // move on to next consumer - } - } - eventQueue.add(new DispatchAction(events, getAsynchronousConsumers())); - } - - /** - * Adds or replaces an event consumer. - * @param consumer the EventConsumer to add or replace. - */ - void addConsumer(EventConsumer consumer) { - synchronized (consumerChange) { - if (consumer.getEventListener() instanceof SynchronousEventListener) { - // remove existing if any - synchronousConsumers.remove(consumer); - // re-add it - synchronousConsumers.add(consumer); - // reset read only consumer set - synchronousReadOnlyConsumers = null; - } else { - // remove existing if any - activeConsumers.remove(consumer); - // re-add it - activeConsumers.add(consumer); - // reset read only consumer set - readOnlyConsumers = null; - } - } - } - - /** - * Unregisters an event consumer from event notification. - * @param consumer the consumer to deregister. - */ - void removeConsumer(EventConsumer consumer) { - synchronized (consumerChange) { - if (consumer.getEventListener() instanceof SynchronousEventListener) { - synchronousConsumers.remove(consumer); - // reset read only listener set - synchronousReadOnlyConsumers = null; - } else { - activeConsumers.remove(consumer); - // reset read only listener set - readOnlyConsumers = null; - } - } - } - -} Index: src/main/java/org/apache/jackrabbit/core/observation/DelegatingObservationDispatcher.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/observation/DelegatingObservationDispatcher.java (revision 441748) +++ src/main/java/org/apache/jackrabbit/core/observation/DelegatingObservationDispatcher.java (working copy) @@ -46,7 +46,7 @@ * * @param disp */ - public void addDispatcher(ObservationManagerFactory disp) { + public void addDispatcher(ObservationDispatcher disp) { synchronized (dispatchers) { dispatchers.add(disp); } @@ -57,7 +57,7 @@ * * @param disp */ - public void removeDispatcher(ObservationManagerFactory disp) { + public void removeDispatcher(ObservationDispatcher disp) { synchronized (dispatchers) { dispatchers.remove(disp); } @@ -107,10 +107,10 @@ * @param session */ public void dispatch(List eventList, SessionImpl session, Path pathPrefix) { - ObservationManagerFactory[] disp; + ObservationDispatcher[] disp; synchronized (dispatchers) { - disp = (ObservationManagerFactory[]) dispatchers.toArray( - new ObservationManagerFactory[dispatchers.size()]); + disp = (ObservationDispatcher[]) dispatchers.toArray( + new ObservationDispatcher[dispatchers.size()]); } for (int i=0; i< disp.length; i++) { EventStateCollection events = Index: src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java (revision 441748) +++ src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java (working copy) @@ -21,7 +21,7 @@ import org.apache.jackrabbit.core.lock.LockManager; import org.apache.jackrabbit.core.observation.EventStateCollection; import org.apache.jackrabbit.core.observation.EventStateCollectionFactory; -import org.apache.jackrabbit.core.observation.ObservationManagerFactory; +import org.apache.jackrabbit.core.observation.ObservationDispatcher; import org.apache.jackrabbit.core.observation.ObservationManagerImpl; import org.apache.jackrabbit.core.query.QueryManagerImpl; import org.apache.jackrabbit.core.state.LocalItemStateManager; @@ -532,18 +532,21 @@ } /** - * {@inheritDoc} + * Returns the observation manager of this session. The observation manager + * is lazily created if it does not exist yet. + * + * @return the observation manager of this session + * @throws RepositoryException if a repository error occurs */ - public ObservationManager getObservationManager() - throws UnsupportedRepositoryOperationException, RepositoryException { + public ObservationManager getObservationManager() throws RepositoryException { // check state of this instance sanityCheck(); if (obsMgr == null) { try { - ObservationManagerFactory factory = - rep.getObservationManagerFactory(wspConfig.getName()); - obsMgr = factory.createObservationManager(session, session.getItemManager()); + ObservationDispatcher factory = + rep.getObservationDispatcher(wspConfig.getName()); + obsMgr = new ObservationManagerImpl(factory, session, session.getItemManager()); } catch (NoSuchWorkspaceException nswe) { // should never get here String msg = "internal error: failed to instantiate observation manager";