Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/JournalWriterProvider.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/JournalWriterProvider.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/JournalWriterProvider.java (revision ) @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.observation2; + +import static org.apache.jackrabbit.JcrConstants.JCR_SYSTEM; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.jackrabbit.oak.plugins.observation2.JournalWriter.EventRecorder; +import org.apache.jackrabbit.oak.spi.commit.Editor; +import org.apache.jackrabbit.oak.spi.commit.EditorProvider; +import org.apache.jackrabbit.oak.spi.commit.VisibleEditor; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; + +/** + * TODO doc + * FIXME don't hard code constants + */ +public class JournalWriterProvider implements EditorProvider { + public static final AtomicLong BUNDLE_ID = new AtomicLong(); + + @Override + public Editor getRootEditor(NodeState before, NodeState after, NodeBuilder builder) { + NodeBuilder journal = builder.child(JCR_SYSTEM) + .child("rep:journal") + .child(String.valueOf(BUNDLE_ID.getAndIncrement())); + String userId = getUserId(); + String userData = getUserData(); + EventRecorder eventRecorder = new EventRecorder(journal, userId, userData); + return VisibleEditor.wrap(new JournalWriter(eventRecorder, "/")); + } + + private String getUserData() { + return ""; // todo implement getUserData + } + + private String getUserId() { + return ""; // todo implement getUserId + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventImpl.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventImpl.java (revision ) @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.observation2; + +import java.util.Collections; +import java.util.Map; + +import javax.jcr.RepositoryException; +import javax.jcr.observation.Event; + +/** + * TODO document + */ +public class EventImpl implements Event { + private final int type; + private final String path; + private final String userID; + private final String identifier; + private final Map info; + private final long date; + private final String userData; + + public EventImpl(int type, String path, String userID, String identifier, Map info, long date, String userData) { + this.type = type; + this.path = path; + this.userID = userID; + this.identifier = identifier; + this.info = info == null ? Collections.emptyMap() : info; + this.date = date; + this.userData = userData; + } + + @Override + public int getType() { + return type; + } + + @Override + public String getPath() throws RepositoryException { + return path; + } + + @Override + public String getUserID() { + return userID; + } + + @Override + public String getIdentifier() throws RepositoryException { + return identifier; + } + + @Override + public Map getInfo() throws RepositoryException { + return info; + } + + @Override + public String getUserData() throws RepositoryException { + return userData; + } + + @Override + public long getDate() throws RepositoryException { + return date; + } + + @Override + public final boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + + EventImpl that = (EventImpl) other; + return date == that.date && type == that.type && + (identifier == null ? that.identifier == null : identifier.equals(that.identifier)) && + (info == null ? that.info == null : info.equals(that.info)) && + (path == null ? that.path == null : path.equals(that.path)) && + (userID == null ? that.userID == null : userID.equals(that.userID)) && + (userData == null ? that.userData == null : userData.equals(that.userData)); + + } + + @Override + public final int hashCode() { + int result = type; + result = 31 * result + (path == null ? 0 : path.hashCode()); + result = 31 * result + (userID == null ? 0 : userID.hashCode()); + result = 31 * result + (identifier == null ? 0 : identifier.hashCode()); + result = 31 * result + (info == null ? 0 : info.hashCode()); + result = 31 * result + (int) (date ^ (date >>> 32)); + result = 31 * result + (userData == null ? 0 : userData.hashCode()); + return result; + } + + @Override + public String toString() { + return "EventImpl{" + + "type=" + type + + ", path='" + path + '\'' + + ", userID='" + userID + '\'' + + ", identifier='" + identifier + '\'' + + ", info=" + info + + ", date=" + date + + ", userData=" + userData + + '}'; + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ChangeFilter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ChangeFilter.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ChangeFilter.java (revision ) @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.observation2; + +import javax.annotation.CheckForNull; +import javax.annotation.Nullable; +import javax.jcr.RepositoryException; +import javax.jcr.nodetype.NoSuchNodeTypeException; + +import org.apache.jackrabbit.oak.api.Tree; +import org.apache.jackrabbit.oak.commons.PathUtils; +import org.apache.jackrabbit.oak.core.ReadOnlyTree; +import org.apache.jackrabbit.oak.namepath.NamePathMapper; +import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager; +import org.apache.jackrabbit.oak.spi.state.NodeState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TODO document + */ +class ChangeFilter { + + private static final Logger log = LoggerFactory.getLogger(ChangeFilter.class); + + private final ReadOnlyNodeTypeManager ntMgr; + private final NamePathMapper namePathMapper; + private final int eventTypes; + private final String path; + private final boolean deep; + private final String[] uuid; // TODO implement filtering by uuid + private final String[] nodeTypeOakName; + private final boolean noLocal; // TODO implement filtering by noLocal + + public ChangeFilter(ReadOnlyNodeTypeManager ntMgr, + NamePathMapper namePathMapper, int eventTypes, + String path, boolean deep, String[] uuid, + String[] nodeTypeName, boolean noLocal) + throws NoSuchNodeTypeException, RepositoryException { + this.ntMgr = ntMgr; + this.namePathMapper = namePathMapper; + this.eventTypes = eventTypes; + this.path = path; + this.deep = deep; + this.uuid = uuid; + this.nodeTypeOakName = validateNodeTypeNames(nodeTypeName); + this.noLocal = noLocal; + } + + public boolean include(int eventType, String path, @Nullable NodeState associatedParentNode) { + return include(eventType) + && include(path) + && (associatedParentNode == null + || includeByType(new ReadOnlyTree(associatedParentNode))); + } + + public boolean includeChildren(String path) { + return PathUtils.isAncestor(path, this.path) || + path.equals(this.path) || + deep && PathUtils.isAncestor(this.path, path); + } + + //-----------------------------< internal >--------------------------------- + + private boolean include(int eventType) { + return (this.eventTypes & eventType) != 0; + } + + private boolean include(String path) { + boolean equalPaths = this.path.equals(path); + if (!deep && !equalPaths) { + return false; + } + if (deep && !(PathUtils.isAncestor(this.path, path) || equalPaths)) { + return false; + } + return true; + } + + /** + * Checks whether to include an event based on the type of the associated + * parent node and the node type filter. + * + * @param associatedParentNode the associated parent node of the event. + * @return whether to include the event based on the type of the associated + * parent node. + */ + private boolean includeByType(Tree associatedParentNode) { + if (nodeTypeOakName == null) { + return true; + } else { + for (String oakName : nodeTypeOakName) { + if (ntMgr.isNodeType(associatedParentNode, oakName)) { + return true; + } + } + // filter has node types set but none matched + return false; + } + } + + /** + * Validates the given node type names. + * + * @param nodeTypeNames the node type names. + * @return the node type names as oak names. + * @throws javax.jcr.nodetype.NoSuchNodeTypeException if one of the node type names refers to + * an non-existing node type. + * @throws javax.jcr.RepositoryException if an error occurs while reading from the + * node type manager. + */ + @CheckForNull + private String[] validateNodeTypeNames(@Nullable String[] nodeTypeNames) + throws NoSuchNodeTypeException, RepositoryException { + if (nodeTypeNames == null) { + return null; + } + String[] oakNames = new String[nodeTypeNames.length]; + for (int i = 0; i < nodeTypeNames.length; i++) { + ntMgr.getNodeType(nodeTypeNames[i]); + oakNames[i] = namePathMapper.getOakName(nodeTypeNames[i]); + } + return oakNames; + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (revision 40987cfd09b2ee7595815d15f6b1a5acb2a99382) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (revision ) @@ -16,6 +16,10 @@ */ package org.apache.jackrabbit.oak; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Lists.newArrayList; +import static java.util.concurrent.Executors.newScheduledThreadPool; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -37,9 +41,10 @@ import org.apache.jackrabbit.oak.plugins.index.CompositeIndexHookProvider; import org.apache.jackrabbit.oak.plugins.index.IndexHookManager; import org.apache.jackrabbit.oak.plugins.index.IndexHookProvider; +import org.apache.jackrabbit.oak.plugins.observation2.JournalWriterProvider; import org.apache.jackrabbit.oak.spi.commit.CommitHook; -import org.apache.jackrabbit.oak.spi.commit.CompositeHook; import org.apache.jackrabbit.oak.spi.commit.CompositeEditorProvider; +import org.apache.jackrabbit.oak.spi.commit.CompositeHook; import org.apache.jackrabbit.oak.spi.commit.ConflictHandler; import org.apache.jackrabbit.oak.spi.commit.Editor; import org.apache.jackrabbit.oak.spi.commit.EditorHook; @@ -56,10 +61,6 @@ import org.apache.jackrabbit.oak.spi.state.NodeState; import org.apache.jackrabbit.oak.spi.state.NodeStore; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.collect.Lists.newArrayList; -import static java.util.concurrent.Executors.newScheduledThreadPool; - /** * Builder class for constructing {@link ContentRepository} instances with * a set of specified plugin components. This class acts as a public facade @@ -267,6 +268,7 @@ // add index hooks later to prevent the OakInitializer to do excessive indexing with(IndexHookManager.of(indexHooks)); + with(new JournalWriterProvider()); withEditorHook(); CommitHook commitHook = CompositeHook.compose(commitHooks); return new ContentRepositoryImpl( \ No newline at end of file Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventCollector.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventCollector.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventCollector.java (revision ) @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.observation2; + +import java.util.Iterator; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jcr.observation.Event; +import javax.jcr.observation.EventListener; + +import org.apache.jackrabbit.commons.iterator.EventIteratorAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TODO document + */ +class EventCollector implements Runnable { + private static final Logger log = LoggerFactory.getLogger(EventCollector.class); + + private final ObservationManagerImpl observationManager; + private final EventBundles eventBundles; + private final EventListener listener; + private final AtomicReference filterRef; + private volatile boolean running; + private ScheduledFuture future; + + public EventCollector(ObservationManagerImpl observationManager, EventListener listener, ChangeFilter filter) { + this.observationManager = observationManager; + this.eventBundles = new EventBundles(observationManager.getContentSession().getLatestRoot()); + this.listener = listener; + filterRef = new AtomicReference(filter); + } + + public void setFilter(ChangeFilter filter) { + filterRef.set(filter); + } + + /** + * Stop this change processor if running. After returning from this methods no further + * events will be delivered. + * @throws IllegalStateException if not yet started or stopped already + */ + public synchronized void stop() { + if (future == null) { + throw new IllegalStateException("Change processor not started"); + } + + try { + future.cancel(true); + while (running) { + wait(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + future = null; + } + } + + /** + * Start the change processor on the passed {@code executor}. + * @param executor + * @throws IllegalStateException if started already + */ + public synchronized void start(ScheduledExecutorService executor) { + if (future != null) { + throw new IllegalStateException("Change processor started already"); + } + future = executor.scheduleWithFixedDelay(this, 100, 1000, TimeUnit.MILLISECONDS); + } + + @Override + public void run() { + running = true; + try { + Iterator bundle = eventBundles.getNext(); + if (bundle != null) { + observationManager.setHasEvents(); + listener.onEvent(new EventIteratorAdapter(bundle)); + } + } catch (Exception e) { + log.error("Unable to generate or send events", e); + } finally { + synchronized (this) { + running = false; + notifyAll(); + } + } + } + +} Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java (revision 40987cfd09b2ee7595815d15f6b1a5acb2a99382) +++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/SessionContext.java (revision ) @@ -16,9 +16,12 @@ */ package org.apache.jackrabbit.oak.jcr; +import static com.google.common.base.Preconditions.checkNotNull; + import java.util.ArrayList; import java.util.List; import java.util.Map; + import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import javax.jcr.PathNotFoundException; @@ -43,14 +46,12 @@ import org.apache.jackrabbit.oak.plugins.name.Namespaces; import org.apache.jackrabbit.oak.plugins.nodetype.DefinitionProvider; import org.apache.jackrabbit.oak.plugins.nodetype.EffectiveNodeTypeProvider; -import org.apache.jackrabbit.oak.plugins.observation.ObservationManagerImpl; +import org.apache.jackrabbit.oak.plugins.observation2.ObservationManagerImpl; import org.apache.jackrabbit.oak.plugins.value.ValueFactoryImpl; import org.apache.jackrabbit.oak.spi.security.SecurityConfiguration; import org.apache.jackrabbit.oak.spi.security.SecurityProvider; import org.apache.jackrabbit.oak.spi.security.authorization.permission.PermissionProvider; import org.apache.jackrabbit.oak.spi.xml.ProtectedItemImporter; - -import static com.google.common.base.Preconditions.checkNotNull; /** * Instances of this class are passed to all JCR implementation classes Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/JournalWriter.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/JournalWriter.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/JournalWriter.java (revision ) @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.observation2; + +import static javax.jcr.observation.Event.NODE_ADDED; +import static javax.jcr.observation.Event.NODE_REMOVED; +import static javax.jcr.observation.Event.PROPERTY_ADDED; +import static javax.jcr.observation.Event.PROPERTY_CHANGED; +import static javax.jcr.observation.Event.PROPERTY_REMOVED; + +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.api.PropertyState; +import org.apache.jackrabbit.oak.commons.PathUtils; +import org.apache.jackrabbit.oak.spi.commit.DefaultEditor; +import org.apache.jackrabbit.oak.spi.commit.Editor; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeState; + +/** + * TODO doc + * FIXME don't hard code constants + */ +public class JournalWriter extends DefaultEditor { + private final EventRecorder eventRecorder; + private final String path; + + private int eventId; + + public JournalWriter(EventRecorder eventRecorder, String path) { + this.eventRecorder = eventRecorder; + this.path = path; + } + + @Override + public void propertyAdded(PropertyState after) throws CommitFailedException { + eventRecorder.recordEvent(PROPERTY_ADDED, PathUtils.concat(path, after.getName()), getIdentifier(after)); + } + + @Override + public void propertyChanged(PropertyState before, PropertyState after) throws CommitFailedException { + eventRecorder.recordEvent(PROPERTY_CHANGED, PathUtils.concat(path, after.getName()), getIdentifier(after)); + } + + @Override + public void propertyDeleted(PropertyState before) throws CommitFailedException { + eventRecorder.recordEvent(PROPERTY_REMOVED, PathUtils.concat(path, before.getName()), getIdentifier(before)); + } + + @Override + public Editor childNodeAdded(String name, NodeState after) throws CommitFailedException { + eventRecorder.recordEvent(NODE_ADDED, PathUtils.concat(path, name), getIdentifier(name, after)); + return new JournalWriter(eventRecorder, PathUtils.concat(path, name)); + } + + @Override + public Editor childNodeChanged(String name, NodeState before, NodeState after) throws CommitFailedException { + return new JournalWriter(eventRecorder, PathUtils.concat(path, name)); + } + + @Override + public Editor childNodeDeleted(String name, NodeState before) throws CommitFailedException { + eventRecorder.recordEvent(NODE_REMOVED, PathUtils.concat(path, name), getIdentifier(name, before)); + return new JournalWriter(eventRecorder, PathUtils.concat(path, name)); + } + + private String getIdentifier(PropertyState after) { + return ""; // TODO implement getIdentifier: return identifier of parent of after + } + + private String getIdentifier(String name, NodeState after) { + return ""; // TODO implement getIdentifier + } + + public static class EventRecorder { + private final NodeBuilder journal; + private final String userId; + private final String userData; + private final long date; + + private long eventId; + + public EventRecorder(NodeBuilder journal, String userId, String userData) { + this.journal = journal; + this.userId = userId; + this.userData = userData; + this.date = System.currentTimeMillis(); + } + + private void recordEvent(int type, String path, String identifier) { + NodeBuilder event = journal.child(String.valueOf(eventId++)); + event.setProperty("type", type); + event.setProperty("path", path); + event.setProperty("identifier", identifier); + event.setProperty("userId", userId); + event.setProperty("date", date); + event.setProperty("userData", userData); + } + } + +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationManagerImpl.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationManagerImpl.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/ObservationManagerImpl.java (revision ) @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.observation2; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jcr.RepositoryException; +import javax.jcr.UnsupportedRepositoryOperationException; +import javax.jcr.observation.EventJournal; +import javax.jcr.observation.EventListener; +import javax.jcr.observation.EventListenerIterator; +import javax.jcr.observation.ObservationManager; + +import org.apache.jackrabbit.commons.iterator.EventListenerIteratorAdapter; +import org.apache.jackrabbit.oak.api.ContentSession; +import org.apache.jackrabbit.oak.api.Root; +import org.apache.jackrabbit.oak.namepath.NamePathMapper; +import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager; + +/** + * TODO document + */ +public class ObservationManagerImpl implements ObservationManager { + private final NamePathMapper namePathMapper; + private final ScheduledExecutorService executor; + private final ReadOnlyNodeTypeManager ntMgr; + private final Root root; + private final Map collectors = new HashMap(); + private final AtomicBoolean hasEvents = new AtomicBoolean(false); + + public ObservationManagerImpl(Root root, NamePathMapper namePathMapper, ScheduledExecutorService executor) { + this.namePathMapper = namePathMapper; + this.executor = executor; + this.ntMgr = ReadOnlyNodeTypeManager.getInstance(root, namePathMapper); + this.root = root; + } + + public synchronized void dispose() { + for (EventCollector collector : collectors.values()) { + collector.stop(); + } + collectors.clear(); + } + + /** + * Determine whether events have been generated since the time this method has been called. + * @return {@code true} if this {@code ObservationManager} instance has generated events + * since the last time this method has been called, {@code false} otherwise. + */ + public boolean hasEvents() { + return hasEvents.getAndSet(false); + } + + @Override + public synchronized void addEventListener(EventListener listener, int eventTypes, String absPath, + boolean isDeep, String[] uuid, String[] nodeTypeName, boolean noLocal) throws RepositoryException { + ChangeFilter filter = new ChangeFilter(ntMgr, namePathMapper, eventTypes, + absPath, isDeep, uuid, nodeTypeName, noLocal); + + EventCollector collector = collectors.get(listener); + if (collector == null) { + collector = new EventCollector(this, listener, filter); + collectors.put(listener, collector); + collector.start(executor); + } else { + collector.setFilter(filter); + } + } + + @Override + public synchronized void removeEventListener(EventListener listener) { + EventCollector collector = collectors.remove(listener); + + if (collector != null) { + collector.stop(); + } + } + + @Override + public EventListenerIterator getRegisteredEventListeners() throws RepositoryException { + return new EventListenerIteratorAdapter(collectors.keySet()); + } + + @Override + public void setUserData(String userData) throws RepositoryException { + throw new UnsupportedRepositoryOperationException("User data not supported"); + } + + @Override + public EventJournal getEventJournal() throws RepositoryException { + throw new UnsupportedRepositoryOperationException(); + } + + @Override + public EventJournal getEventJournal(int eventTypes, String absPath, boolean isDeep, String[] uuid, String[] + nodeTypeName) throws RepositoryException { + throw new UnsupportedRepositoryOperationException(); + } + + //------------------------------------------------------------< internal >--- + + void setHasEvents() { + hasEvents.set(true); + } + + public ContentSession getContentSession() { + return root.getContentSession(); + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventBundles.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventBundles.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/observation2/EventBundles.java (revision ) @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.plugins.observation2; + +import java.util.Collections; +import java.util.Iterator; + +import javax.jcr.observation.Event; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import org.apache.jackrabbit.oak.api.Root; +import org.apache.jackrabbit.oak.api.Tree; +import org.apache.jackrabbit.oak.api.Type; + +/** + * TODO document + * TODO apply filter (needs associated parent node) + * FIXME don't hard code constants + */ +public class EventBundles { + private final Root root; + private final Tree bundles; + private long nextBundleId = JournalWriterProvider.BUNDLE_ID.get(); + + public EventBundles(Root root) { + this.root = root; + this.bundles = root.getTree("/jcr:system/rep:journal"); + } + + public Iterator getNext() { + root.refresh(); + + Iterator events = getEvents(nextBundleId); + if (events == null) { + return null; + } + + nextBundleId++; + return Iterators.transform(events, new Function() { + @Override + public Event apply(Tree event) { + return createEvent(event); + } + }); + } + + private Iterator getEvents(long next) { + Tree bundle = bundles.getChild(String.valueOf(next)); + return bundle == null + ? null + : bundle.getChildren().iterator(); + } + + private static Event createEvent(Tree event) { + int type = (int) getLong(event, "type"); + String path = getString(event, "path"); // FIXME map to jcr path + String userId = getString(event, "userId"); + String identifier = getString(event, "userData"); + long date = getLong(event, "date"); + String userData = getString(event, "userData"); + return new EventImpl(type, path, userId, identifier, Collections.emptyMap(), date, userData); + } + + private static long getLong(Tree event, String name) { + return event.getProperty(name).getValue(Type.LONG); + } + + private static String getString(Tree event, String name) { + return event.getProperty(name).getValue(Type.STRING); + } +}