Index: src/main/java/org/apache/jackrabbit/core/cluster/ChangeLogRecord.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/cluster/ChangeLogRecord.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/cluster/ChangeLogRecord.java (working copy) @@ -43,6 +43,11 @@ public class ChangeLogRecord extends ClusterRecord { /** + * Identifier: DATE + */ + static final char DATE_IDENTIFIER = 'D'; + + /** * Identifier: NODE. */ static final char NODE_IDENTIFIER = 'N'; @@ -78,6 +83,11 @@ private ChangeLog changes; /** + * The time when the changes happened. Milliseconds since January 1 1970 UTC. + */ + private long timestamp = System.currentTimeMillis(); + + /** * List of EventStates. */ private List events; @@ -96,7 +106,7 @@ * Create a new instance of this class. Used when serializing. * * @param changes changes - * @param list of EventStates + * @param events list of EventStates * @param record record * @param workspace workspace */ @@ -131,6 +141,9 @@ while (identifier != END_MARKER) { switch (identifier) { + case DATE_IDENTIFIER: + readTimestampRecord(); + break; case NODE_IDENTIFIER: readNodeRecord(); break; @@ -158,6 +171,15 @@ } /** + * Reads the timestamp record. + * + * @throws JournalException if an error occurs. + */ + private void readTimestampRecord() throws JournalException { + timestamp = record.readLong(); + } + + /** * Read a node record. * * @throws JournalException if an error occurs @@ -288,6 +310,7 @@ * {@inheritDoc} */ protected void doWrite() throws JournalException { + writeTimestampRecord(); Iterator deletedStates = changes.deletedStates(); while (deletedStates.hasNext()) { ItemState state = (ItemState) deletedStates.next(); @@ -324,6 +347,16 @@ } /** + * Writes the timestamp record. + * + * @throws JournalException if an error occurs. + */ + private void writeTimestampRecord() throws JournalException { + record.writeChar(DATE_IDENTIFIER); + record.writeLong(timestamp); + } + + /** * Write a node record * * @param operation operation @@ -397,9 +430,17 @@ * Return the events. * * @return events - * @return */ public List getEvents() { return Collections.unmodifiableList(events); } + + /** + * Returns the timestamp. + * + * @return the timestamp. + */ + public long getTimestamp() { + return timestamp; + } } Index: src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (working copy) @@ -849,7 +849,8 @@ } } try { - listener.externalUpdate(record.getChanges(), record.getEvents()); + listener.externalUpdate(record.getChanges(), + record.getEvents(), record.getTimestamp()); } catch (RepositoryException e) { String msg = "Unable to deliver update events: " + e.getMessage(); log.error(msg); Index: src/main/java/org/apache/jackrabbit/core/cluster/ClusterRecordDeserializer.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/cluster/ClusterRecordDeserializer.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/cluster/ClusterRecordDeserializer.java (working copy) @@ -40,6 +40,7 @@ case ChangeLogRecord.NODE_IDENTIFIER: case ChangeLogRecord.PROPERTY_IDENTIFIER: case ChangeLogRecord.EVENT_IDENTIFIER: + case ChangeLogRecord.DATE_IDENTIFIER: clusterRecord = new ChangeLogRecord(c, record, workspace); clusterRecord.read(); break; Index: src/main/java/org/apache/jackrabbit/core/cluster/Update.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/cluster/Update.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/cluster/Update.java (working copy) @@ -57,4 +57,11 @@ */ List getEvents(); + /** + * Returns the timestamp whe this update occured. + * + * @return the timestamp whe this update occured. + */ + long getTimestamp(); + } Index: src/main/java/org/apache/jackrabbit/core/cluster/UpdateEventListener.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/cluster/UpdateEventListener.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/cluster/UpdateEventListener.java (working copy) @@ -30,8 +30,10 @@ * * @param changes external changes containing only node and property ids. * @param events events to deliver + * @param timestamp when the change occured. * @throws RepositoryException if the update cannot be processed */ - void externalUpdate(ChangeLog changes, List events) throws RepositoryException; + void externalUpdate(ChangeLog changes, List events, long timestamp) + throws RepositoryException; } Index: src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/journal/AbstractJournal.java (working copy) @@ -230,16 +230,6 @@ } /** - * Return an iterator over all records after the specified revision. - * Subclass responsibility. - * - * @param startRevision start point (exlusive) - * @throws JournalException if an error occurs - */ - protected abstract RecordIterator getRecords(long startRevision) - throws JournalException; - - /** * Lock the journal revision, disallowing changes from other sources until * {@link #unlock has been called, and synchronizes to the latest change. * Index: src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/journal/AppendRecord.java (working copy) @@ -205,6 +205,20 @@ /** * {@inheritDoc} */ + public void writeLong(long n) throws JournalException { + checkOutput(); + + try { + dataOut.writeLong(n); + } catch (IOException e) { + String msg = "I/O error while writing long."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ public void writeString(String s) throws JournalException { checkOutput(); @@ -386,6 +400,10 @@ throw unsupported(); } + public long readLong() throws JournalException { + throw unsupported(); + } + public String readString() throws JournalException { throw unsupported(); } Index: src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java (working copy) @@ -435,7 +435,7 @@ /** * {@inheritDoc} */ - protected RecordIterator getRecords(long startRevision) + public RecordIterator getRecords(long startRevision) throws JournalException { try { @@ -458,6 +458,28 @@ /** * {@inheritDoc} + */ + public RecordIterator getRecords() throws JournalException { + try { + checkConnection(); + + selectRevisionsStmt.clearParameters(); + selectRevisionsStmt.clearWarnings(); + selectRevisionsStmt.setLong(1, Long.MIN_VALUE); + selectRevisionsStmt.execute(); + + return new DatabaseRecordIterator( + selectRevisionsStmt.getResultSet(), getResolver(), getNamePathResolver()); + } catch (SQLException e) { + close(true); + + String msg = "Unable to return record iterator."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} *

* This journal is locked by incrementing the current value in the table * named GLOBAL_REVISION, which effectively write-locks this Index: src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java (working copy) @@ -154,7 +154,7 @@ /** * {@inheritDoc} */ - protected RecordIterator getRecords(long startRevision) + public RecordIterator getRecords(long startRevision) throws JournalException { long stopRevision = getGlobalRevision(); @@ -174,6 +174,32 @@ /** * {@inheritDoc} */ + public RecordIterator getRecords() throws JournalException { + long stopRevision = getGlobalRevision(); + long startRevision = 0; + + RotatingLogFile[] logFiles = RotatingLogFile.listFiles(rootDirectory, basename); + File[] files = new File[logFiles.length]; + for (int i = 0; i < files.length; i++) { + files[i] = logFiles[i].getFile(); + if (i == 0) { + try { + FileRecordLog log = new FileRecordLog(files[i]); + startRevision = log.getPreviousRevision(); + } catch (IOException e) { + String msg = "Unable to read startRevision from first " + + "record log file"; + throw new JournalException(msg, e); + } + } + } + return new FileRecordIterator(files, startRevision, stopRevision, + getResolver(), getNamePathResolver()); + } + + /** + * {@inheritDoc} + */ protected void doLock() throws JournalException { globalRevision.lock(false); } Index: src/main/java/org/apache/jackrabbit/core/journal/Journal.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/journal/Journal.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/journal/Journal.java (working copy) @@ -63,6 +63,7 @@ * Return the record producer for a given identifier. * * @param identifier identifier + * @return the record producer for a given identifier. * @throws JournalException if an error occurs */ RecordProducer getProducer(String identifier) throws JournalException; @@ -79,4 +80,22 @@ * @throws JournalException on error */ public InstanceRevision getInstanceRevision() throws JournalException; + + /** + * Return an iterator over all records after the specified revision. + * + * @param startRevision start point (exlusive) + * @return an iterator over all records after the specified revision. + * @throws JournalException if an error occurs + */ + public RecordIterator getRecords(long startRevision) + throws JournalException; + + /** + * Return an iterator over all available records in the journal. + * + * @return an iterator over all records. + * @throws JournalException if an error occurs + */ + public RecordIterator getRecords() throws JournalException; } Index: src/main/java/org/apache/jackrabbit/core/journal/ReadRecord.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/journal/ReadRecord.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/journal/ReadRecord.java (working copy) @@ -154,6 +154,20 @@ /** * {@inheritDoc} */ + public long readLong() throws JournalException { + consumed = true; + + try { + return dataIn.readLong(); + } catch (IOException e) { + String msg = "I/O error while reading long."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ public String readString() throws JournalException { consumed = true; @@ -241,6 +255,10 @@ throw unsupported(); } + public void writeLong(long n) throws JournalException { + throw unsupported(); + } + public void writeString(String s) throws JournalException { throw unsupported(); } Index: src/main/java/org/apache/jackrabbit/core/journal/Record.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/journal/Record.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/journal/Record.java (working copy) @@ -81,6 +81,14 @@ int readInt() throws JournalException; /** + * Read a long from the underlying stream. + * + * @return long value. + * @throws JournalException if an error occurs + */ + long readLong() throws JournalException; + + /** * Read a string from the underlying stream. * * @return string or null @@ -177,6 +185,14 @@ void writeInt(int n) throws JournalException; /** + * Write a long to the underlying stream. + * + * @param n long + * @throws JournalException if an error occurs + */ + void writeLong(long n) throws JournalException; + + /** * Write a string to the underlying stream. * * @param s string, may be null Index: src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/observation/EventConsumer.java (working copy) @@ -238,7 +238,8 @@ return; } // check if filtered iterator has at least one event - EventIterator it = new FilteredEventIterator(events, filter, denied); + EventIterator it = new FilteredEventIterator(events.iterator(), + events.getTimestamp(), filter, denied); if (it.hasNext()) { listener.onEvent(it); } else { Index: src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/observation/EventFilter.java (working copy) @@ -32,7 +32,7 @@ * The EventFilter class implements the filter logic based * on the session's access rights and the specified filter rules. */ -class EventFilter { +public class EventFilter { static final EventFilter BLOCK_ALL = new BlockAllFilter(); @@ -131,6 +131,7 @@ } /** + * TODO: remove this unused method. * Returns the ItemManager associated with this * EventFilter. * Index: src/main/java/org/apache/jackrabbit/core/observation/EventImpl.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/observation/EventImpl.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/observation/EventImpl.java (working copy) @@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory; import javax.jcr.RepositoryException; -import javax.jcr.observation.Event; /** * Implementation of the {@link javax.jcr.observation.Event} and @@ -46,14 +45,14 @@ private final SessionImpl session; /** - * The ItemManager of the session. + * The shared {@link EventState} object. */ - //private final ItemManager itemMgr; + private final EventState eventState; /** - * The shared {@link EventState} object. + * The timestamp of this event. */ - private final EventState eventState; + private final long timestamp; /** * Cached String value of this Event instance. @@ -67,10 +66,12 @@ * @param session the session of the registerd EventListener * where this Event will be delivered to. * @param eventState the underlying EventState. + * @param timestamp the time when the change occured that caused this event. */ - EventImpl(SessionImpl session, EventState eventState) { + EventImpl(SessionImpl session, EventState eventState, long timestamp) { this.session = session; this.eventState = eventState; + this.timestamp = timestamp; } //---------------------------------------------------------------< Event > @@ -96,6 +97,13 @@ return eventState.getUserId(); } + /** + * {@inheritDoc} + */ + public long getDate() { + return timestamp; + } + //-----------------------------------------------------------< EventImpl > /** Index: src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java (revision 0) +++ src/main/java/org/apache/jackrabbit/core/observation/EventJournalImpl.java (revision 0) @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.observation; + +import java.util.List; +import java.util.LinkedList; +import java.util.NoSuchElementException; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.Date; +import java.util.Collections; +import java.text.DateFormat; + +import javax.jcr.observation.Event; +import javax.jcr.observation.EventIterator; + +import org.apache.jackrabbit.api.jsr283.observation.EventJournal; +import org.apache.jackrabbit.core.journal.Journal; +import org.apache.jackrabbit.core.journal.RecordIterator; +import org.apache.jackrabbit.core.journal.JournalException; +import org.apache.jackrabbit.core.journal.Record; +import org.apache.jackrabbit.core.cluster.ClusterRecordDeserializer; +import org.apache.jackrabbit.core.cluster.ClusterRecord; +import org.apache.jackrabbit.core.cluster.ClusterRecordProcessor; +import org.apache.jackrabbit.core.cluster.ChangeLogRecord; +import org.apache.jackrabbit.core.cluster.LockRecord; +import org.apache.jackrabbit.core.cluster.NamespaceRecord; +import org.apache.jackrabbit.core.cluster.NodeTypeRecord; +import org.apache.jackrabbit.core.cluster.WorkspaceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * EventJournalImpl implements the JCR 2.0 {@link EventJournal}. + */ +public class EventJournalImpl implements EventJournal { + + /** + * The logger instance for this class. + */ + private static final Logger log = LoggerFactory.getLogger(EventJournalImpl.class); + + /** + * The minimum buffer size for events in {@link #eventBundleBuffer}. + */ + private static final int MIN_BUFFER_SIZE = 1024; + + /** + * Map of skip maps. Key=Journal, Value=SortedMap + *

+ * Each sorted map has the following structure: + * Key=Long (timestamp), Value=Long (revision) + */ + private static final Map REVISION_SKIP_MAPS = new WeakHashMap(); + + /** + * Last revision seen by this event journal. + */ + private Long lastRevision; + + /** + * The event filter. + */ + private final EventFilter filter; + + /** + * The journal of this repository. + */ + private final Journal journal; + + /** + * The producer id to filter journal records. + */ + private final String producerId; + + /** + * The name of the workspace to filter journal records. + */ + private final String wspName; + + /** + * Buffer of {@link EventBundle}s. + */ + private final List eventBundleBuffer = new LinkedList(); + + /** + * The current position of this iterator. + */ + private long position; + + /** + * Creates a new event journal. + * + * @param filter for filtering the events read from the journal. + * @param journal the cluster journal. + * @param producerId the producer id of the cluster node. + */ + public EventJournalImpl(EventFilter filter, + Journal journal, + String producerId) { + this.filter = filter; + this.journal = journal; + this.producerId = producerId; + this.wspName = filter.getSession().getWorkspace().getName(); + } + + //------------------------< EventJournal >--------------------------------- + + /** + * {@inheritDoc} + */ + public void skipTo(long date) { + long time = System.currentTimeMillis(); + + // get skip map for this journal + SortedMap skipMap = getSkipMap(); + synchronized (skipMap) { + SortedMap head = skipMap.headMap(new Long(time)); + if (!head.isEmpty()) { + eventBundleBuffer.clear(); + lastRevision = (Long) head.get(head.lastKey()); + } + } + + try { + while (hasNext()) { + EventBundle bundle = getCurrentBundle(); + if (bundle.timestamp <= date) { + eventBundleBuffer.remove(0); + } else { + break; + } + } + } finally { + time = System.currentTimeMillis() - time; + log.debug("Skipped event bundles in {} ms.", new Long(time)); + } + } + + //------------------------< EventIterator >--------------------------------- + + /** + * {@inheritDoc} + */ + public Event nextEvent() { + // calling hasNext() will also trigger refill if necessary! + if (!hasNext()) { + throw new NoSuchElementException(); + } + EventBundle bundle = getCurrentBundle(); + // above hasNext() call ensures that there is bundle with an event state + assert bundle != null && bundle.events.hasNext(); + + Event next = (Event) bundle.events.next(); + if (!bundle.events.hasNext()) { + // done with this bundle -> remove from buffer + eventBundleBuffer.remove(0); + } + position++; + return next; + } + + //------------------------< RangeIterator >--------------------------------- + + /** + * {@inheritDoc} + */ + public void skip(long skipNum) { + while (skipNum-- > 0) { + nextEvent(); + } + } + + /** + * @return always -1. + */ + public long getSize() { + return -1; + } + + /** + * {@inheritDoc} + */ + public long getPosition() { + // TODO: what happens to position when skipped + return position; + } + + //--------------------------< Iterator >------------------------------------ + + /** + * @throws UnsupportedOperationException always. + */ + public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc} + */ + public boolean hasNext() { + if (!eventBundleBuffer.isEmpty()) { + return true; + } + // try refill + refill(); + // check again + return !eventBundleBuffer.isEmpty(); + } + + /** + * {@inheritDoc} + */ + public Object next() { + return nextEvent(); + } + + //----------------------< ClusterRecordProcessor >-------------------------- + + /** + * Implements {@link ClusterRecordProcessor} and keeps track of the number + * of events read and the timestamp of the last record processed. + */ + private class RecordProcessor implements ClusterRecordProcessor { + + /** + * Number of events read so far. + */ + private int numEvents; + + /** + * The timestamp of the last record processed. + */ + private long lastTimestamp; + + /** + * @return the number of events read so far. + */ + private int getNumEvents() { + return numEvents; + } + + /** + * @return the timestamp of the last record processed. + */ + private long getLastTimestamp() { + return lastTimestamp; + } + + /** + * {@inheritDoc} + */ + public void process(ChangeLogRecord record) { + List events = record.getEvents(); + if (!events.isEmpty()) { + EventBundle bundle = new EventBundle(events, + record.getTimestamp(), filter); + if (bundle.events.hasNext()) { + // only queue bundle if there is an event + eventBundleBuffer.add(bundle); + numEvents += events.size(); + lastTimestamp = record.getTimestamp(); + } + } + } + + public void process(LockRecord record) { + // ignore + } + + public void process(NamespaceRecord record) { + // ignore + } + + public void process(NodeTypeRecord record) { + // ignore + } + + public void process(WorkspaceRecord record) { + // ignore + } + } + + //-------------------------------< internal >------------------------------- + + /** + * @return the current event bundle or null if there is none. + */ + private EventBundle getCurrentBundle() { + while (!eventBundleBuffer.isEmpty()) { + EventBundle bundle = (EventBundle) eventBundleBuffer.get(0); + if (bundle.events.hasNext()) { + return bundle; + } else { + eventBundleBuffer.remove(0); + } + } + return null; + } + + /** + * Refills the {@link #eventBundleBuffer}. + */ + private void refill() { + assert eventBundleBuffer.isEmpty(); + try { + RecordProcessor processor = new RecordProcessor(); + ClusterRecordDeserializer deserializer = new ClusterRecordDeserializer(); + RecordIterator records; + if (lastRevision != null) { + log.debug("refilling event bundle buffer starting at revision {}", + lastRevision); + records = journal.getRecords(lastRevision.longValue()); + } else { + log.debug("refilling event bundle buffer starting at journal beginning"); + records = journal.getRecords(); + } + try { + while (processor.getNumEvents() < MIN_BUFFER_SIZE && records.hasNext()) { + Record record = records.nextRecord(); + if (record.getProducerId().equals(producerId)) { + ClusterRecord cr = deserializer.deserialize(record); + if (!wspName.equals(cr.getWorkspace())) { + continue; + } + cr.process(processor); + lastRevision = new Long(cr.getRevision()); + } + } + + if (processor.getNumEvents() >= MIN_BUFFER_SIZE) { + // remember in skip map + SortedMap skipMap = getSkipMap(); + Long timestamp = new Long(processor.getLastTimestamp()); + synchronized (skipMap) { + if (log.isDebugEnabled()) { + DateFormat df = DateFormat.getDateTimeInstance(); + log.debug("remember record in skip map: {} -> {}", + df.format(new Date(timestamp.longValue())), + lastRevision); + } + skipMap.put(timestamp, lastRevision); + } + } + } finally { + records.close(); + } + } catch (JournalException e) { + log.warn("Unable to read journal records", e); + } + } + + /** + * @return the revision skip map for this journal. + */ + private SortedMap getSkipMap() { + synchronized (REVISION_SKIP_MAPS) { + SortedMap map = (SortedMap) REVISION_SKIP_MAPS.get(journal); + if (map == null) { + map = new TreeMap(); + REVISION_SKIP_MAPS.put(journal, map); + } + return map; + } + } + + /** + * Simple class to associate an {@link EventState} iterator with a timestamp. + */ + private static final class EventBundle { + + /** + * An iterator of {@link Event}s. + */ + final EventIterator events; + + /** + * Timestamp when the events were created. + */ + final long timestamp; + + /** + * Creates a new event bundle. + * + * @param eventStates the {@link EventState}s that belong to this bundle. + * @param timestamp the timestamp when the events were created. + * @param filter the event filter. + */ + private EventBundle(List eventStates, + long timestamp, + EventFilter filter) { + this.events = new FilteredEventIterator(eventStates.iterator(), + timestamp, filter, Collections.EMPTY_SET); + this.timestamp = timestamp; + } + } +} Property changes on: src\main\java\org\apache\jackrabbit\core\observation\EventJournalImpl.java ___________________________________________________________________ Added: svn:eol-style + native Index: src/main/java/org/apache/jackrabbit/core/observation/EventStateCollection.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/observation/EventStateCollection.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/observation/EventStateCollection.java (working copy) @@ -88,6 +88,11 @@ private final Path pathPrefix; /** + * Timestamp when this collection was created. + */ + private long timestamp = System.currentTimeMillis(); + + /** * Creates a new empty EventStateCollection. *

* Because the item state manager in {@link #createEventStates} may represent @@ -434,6 +439,22 @@ } /** + * @return the timestamp when this collection was created. + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Sets a new timestamp for this collection. + * + * @param timestamp the new timestamp value. + */ + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + /** * Returns an iterator over {@link EventState} instance. * * @return an iterator over {@link EventState} instance. Index: src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/observation/FilteredEventIterator.java (working copy) @@ -63,21 +63,29 @@ private long pos = 0; /** + * The timestamp when the events occured. + */ + private long timestamp; + + /** * Creates a new FilteredEventIterator. * - * @param c an unmodifiable Collection of {@link javax.jcr.observation.Event}s. + * @param eventStates an iterator over unfiltered {@link EventState}s. + * @param timestamp the time when the event were created. * @param filter only event that pass the filter will be dispatched to the * event listener. * @param denied Set of ItemIds of denied ItemStates * rejected by the AccessManager. If * null no ItemState is denied. */ - public FilteredEventIterator(EventStateCollection c, + public FilteredEventIterator(Iterator eventStates, + long timestamp, EventFilter filter, Set denied) { - actualEvents = c.iterator(); + this.actualEvents = eventStates; this.filter = filter; this.denied = denied; + this.timestamp = timestamp; fetchNext(); } @@ -158,7 +166,8 @@ // check denied set if (denied == null || !denied.contains(state.getTargetId())) { try { - next = filter.blocks(state) ? null : new EventImpl(filter.getSession(), state); + next = filter.blocks(state) ? null : new EventImpl( + filter.getSession(), state, timestamp); } catch (RepositoryException e) { log.error("Exception while applying filter.", e); } Index: src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/observation/ObservationManagerImpl.java (working copy) @@ -102,42 +102,9 @@ boolean noLocal) throws RepositoryException { - // create NodeType instances from names - NodeTypeImpl[] nodeTypes; - if (nodeTypeName == null) { - nodeTypes = null; - } else { - NodeTypeManagerImpl ntMgr = session.getNodeTypeManager(); - nodeTypes = new NodeTypeImpl[nodeTypeName.length]; - for (int i = 0; i < nodeTypes.length; i++) { - nodeTypes[i] = (NodeTypeImpl) ntMgr.getNodeType(nodeTypeName[i]); - } - } - - Path path; - try { - path = session.getQPath(absPath).getNormalizedPath(); - } catch (NameException e) { - String msg = "invalid path syntax: " + absPath; - log.debug(msg); - throw new RepositoryException(msg, e); - } - NodeId[] ids = null; - if (uuid != null) { - ids = new NodeId[uuid.length]; - for (int i = 0; i < uuid.length; i++) { - ids[i] = NodeId.valueOf(uuid[i]); - } - } // create filter - EventFilter filter = new EventFilter(itemMgr, - session, - eventTypes, - path, - isDeep, - ids, - nodeTypes, - noLocal); + EventFilter filter = createEventFilter(eventTypes, absPath, + isDeep, uuid, nodeTypeName, noLocal); dispatcher.addConsumer(new EventConsumer(session, listener, filter)); } @@ -180,6 +147,60 @@ } + /** + * Creates a new event filter with the given restrictions. + * + * @param eventTypes A combination of one or more event type constants encoded as a bitmask. + * @param absPath an absolute path. + * @param isDeep a boolean. + * @param uuid array of UUIDs. + * @param nodeTypeName array of node type names. + * @param noLocal a boolean. + * @return the event filter with the given restrictions. + * @throws RepositoryException if an error occurs. + */ + public EventFilter createEventFilter(int eventTypes, + String absPath, + boolean isDeep, + String[] uuid, + String[] nodeTypeName, + boolean noLocal) + throws RepositoryException { + // create NodeType instances from names + NodeTypeImpl[] nodeTypes; + if (nodeTypeName == null) { + nodeTypes = null; + } else { + NodeTypeManagerImpl ntMgr = session.getNodeTypeManager(); + nodeTypes = new NodeTypeImpl[nodeTypeName.length]; + for (int i = 0; i < nodeTypes.length; i++) { + nodeTypes[i] = (NodeTypeImpl) ntMgr.getNodeType(nodeTypeName[i]); + } + } + + Path path; + try { + path = session.getQPath(absPath).getNormalizedPath(); + } catch (NameException e) { + String msg = "invalid path syntax: " + absPath; + log.debug(msg); + throw new RepositoryException(msg, e); + } + if (!path.isAbsolute()) { + throw new RepositoryException("absPath must be absolute"); + } + NodeId[] ids = null; + if (uuid != null) { + ids = new NodeId[uuid.length]; + for (int i = 0; i < uuid.length; i++) { + ids[i] = NodeId.valueOf(uuid[i]); + } + } + // create filter + return new EventFilter(itemMgr, session, eventTypes, path, + isDeep, ids, nodeTypes, noLocal); + } + //------------------------------------------< EventStateCollectionFactory > /** Index: src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/RepositoryImpl.java (working copy) @@ -2096,11 +2096,14 @@ /** * {@inheritDoc} */ - public void externalUpdate(ChangeLog external, List events) throws RepositoryException { + public void externalUpdate(ChangeLog external, + List events, + long timestamp) throws RepositoryException { try { EventStateCollection esc = new EventStateCollection( getObservationDispatcher(), null, null); esc.addAll(events); + esc.setTimestamp(timestamp); getItemStateProvider().externalUpdate(external, esc); } catch (IllegalStateException e) { Index: src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java (working copy) @@ -515,6 +515,11 @@ private HashMap attributes; /** + * Timestamp when this update was created. + */ + private long timestamp = System.currentTimeMillis(); + + /** * Create a new instance of this class. */ public Update(ChangeLog local, EventStateCollectionFactory factory, @@ -835,6 +840,13 @@ } /** + * {@inheritDoc} + */ + public long getTimestamp() { + return timestamp; + } + + /** * Updates the target node references collections based on the * modifications in the change log (i.e. added/removed/modified * REFERENCE properties). Index: src/main/java/org/apache/jackrabbit/core/version/VersionManagerImpl.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/version/VersionManagerImpl.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/version/VersionManagerImpl.java (working copy) @@ -494,9 +494,11 @@ /** * {@inheritDoc} */ - public void externalUpdate(ChangeLog changes, List events) throws RepositoryException { + public void externalUpdate(ChangeLog changes, List events, long timestamp) + throws RepositoryException { EventStateCollection esc = getEscFactory().createEventStateCollection(null); esc.addAll(events); + esc.setTimestamp(timestamp); sharedStateMgr.externalUpdate(changes, esc); } Index: src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java =================================================================== --- src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java (revision 712651) +++ src/main/java/org/apache/jackrabbit/core/WorkspaceImpl.java (working copy) @@ -17,11 +17,14 @@ package org.apache.jackrabbit.core; import org.apache.jackrabbit.api.JackrabbitWorkspace; +import org.apache.jackrabbit.api.jsr283.observation.EventJournal; import org.apache.jackrabbit.core.config.WorkspaceConfig; import org.apache.jackrabbit.core.lock.LockManager; import org.apache.jackrabbit.core.observation.EventStateCollection; import org.apache.jackrabbit.core.observation.EventStateCollectionFactory; import org.apache.jackrabbit.core.observation.ObservationManagerImpl; +import org.apache.jackrabbit.core.observation.EventJournalImpl; +import org.apache.jackrabbit.core.observation.EventFilter; import org.apache.jackrabbit.core.query.QueryManagerImpl; import org.apache.jackrabbit.core.state.LocalItemStateManager; import org.apache.jackrabbit.core.state.SharedItemStateManager; @@ -31,6 +34,8 @@ import org.apache.jackrabbit.core.xml.ImportHandler; import org.apache.jackrabbit.core.xml.Importer; import org.apache.jackrabbit.core.xml.WorkspaceImporter; +import org.apache.jackrabbit.core.cluster.ClusterNode; +import org.apache.jackrabbit.core.security.principal.AdminPrincipal; import org.apache.jackrabbit.commons.AbstractWorkspace; import org.apache.jackrabbit.spi.commons.conversion.NameException; import org.apache.jackrabbit.spi.Path; @@ -57,6 +62,7 @@ import javax.jcr.version.Version; import javax.jcr.version.VersionException; import javax.jcr.version.VersionHistory; +import javax.security.auth.Subject; import java.util.HashMap; import java.util.Iterator; @@ -732,6 +738,43 @@ } /** + * Returns the event journal for this repository. The events are filtered + * according to the passed criteria. + * + * @param eventTypes A combination of one or more event type constants encoded as a bitmask. + * @param absPath an absolute path. + * @param isDeep a boolean. + * @param uuid array of UUIDs. + * @param nodeTypeName array of node type names. + * @return the event journal for this repository. + * @throws UnsupportedRepositoryOperationException if this repository does + * not support an event journal (cluster journal disabled). + * @throws RepositoryException if another error occurs. + */ + public EventJournal getEventJournal(int eventTypes, + String absPath, + boolean isDeep, + String[] uuid, + String[] nodeTypeName) + throws RepositoryException { + Subject subject = ((SessionImpl) getSession()).getSubject(); + if (subject.getPrincipals(AdminPrincipal.class).isEmpty()) { + throw new RepositoryException("Only administrator session may " + + "access EventJournal"); + } + ClusterNode clusterNode = rep.getClusterNode(); + if (clusterNode == null) { + throw new UnsupportedRepositoryOperationException(); + } + + ObservationManagerImpl obsMgr = (ObservationManagerImpl) session.getWorkspace().getObservationManager(); + EventFilter filter = obsMgr.createEventFilter(eventTypes, absPath, + isDeep, uuid, nodeTypeName, false); + return new EventJournalImpl(filter, clusterNode.getJournal(), + clusterNode.getId()); + } + + /** * {@inheritDoc} */ public synchronized QueryManager getQueryManager() throws RepositoryException { Index: src/test/java/org/apache/jackrabbit/core/cluster/SimpleEventListener.java =================================================================== --- src/test/java/org/apache/jackrabbit/core/cluster/SimpleEventListener.java (revision 712651) +++ src/test/java/org/apache/jackrabbit/core/cluster/SimpleEventListener.java (working copy) @@ -28,7 +28,6 @@ import javax.jcr.nodetype.NoSuchNodeTypeException; import org.apache.jackrabbit.core.NodeId; -import org.apache.jackrabbit.core.cluster.LockEventListener; import org.apache.jackrabbit.core.nodetype.InvalidNodeTypeDefException; import org.apache.jackrabbit.core.nodetype.NodeTypeDef; import org.apache.jackrabbit.core.state.ChangeLog; @@ -413,10 +412,10 @@ /** * {@inheritDoc} */ - public void externalUpdate(ChangeLog changes, List events) + public void externalUpdate(ChangeLog changes, List events, long timestamp) throws RepositoryException { - clusterEvents.add(new UpdateEvent(changes, events)); + clusterEvents.add(new UpdateEvent(changes, events, timestamp)); } @@ -441,14 +440,21 @@ private final transient Map attributes = new HashMap(); /** + * Timestamp when the changes in this update event occured. + */ + private final long timestamp; + + /** * Create a new instance of this class. * * @param changes change log * @param events list of EventStates + * @param timestamp time when the changes in this event occured. */ - public UpdateEvent(ChangeLog changes, List events) { + public UpdateEvent(ChangeLog changes, List events, long timestamp) { this.changes = changes; this.events = events; + this.timestamp = timestamp; } /** @@ -472,6 +478,13 @@ /** * {@inheritDoc} */ + public long getTimestamp() { + return timestamp; + } + + /** + * {@inheritDoc} + */ public void setAttribute(String name, Object value) { attributes.put(name, value); } Index: src/test/java/org/apache/jackrabbit/core/cluster/UpdateEventFactory.java =================================================================== --- src/test/java/org/apache/jackrabbit/core/cluster/UpdateEventFactory.java (revision 712651) +++ src/test/java/org/apache/jackrabbit/core/cluster/UpdateEventFactory.java (working copy) @@ -116,7 +116,7 @@ events.add(createEventState(p2, n2, Event.PROPERTY_REMOVED)); events.add(createEventState(n3, Event.NODE_REMOVED, "{}n3")); - return new UpdateEvent(changes, events); + return new UpdateEvent(changes, events, System.currentTimeMillis()); } @@ -139,6 +139,7 @@ * * @param parentId parent node id * @param name property name + * @return property state. */ protected PropertyState createPropertyState(NodeId parentId, String name) { Name propName = nameFactory.create(name); @@ -176,9 +177,9 @@ /** * Create an event state for a property operation. * - * @param n node state + * @param p property state + * @param parent parent node state * @param type Event.NODE_ADDED or Event.NODE_REMOVED - * @param name property name * @return event state */ protected EventState createEventState(PropertyState p, NodeState parent, int type) { Index: src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java =================================================================== --- src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java (revision 712651) +++ src/test/java/org/apache/jackrabbit/core/journal/MemoryJournal.java (working copy) @@ -23,11 +23,6 @@ import java.util.ArrayList; import java.util.NoSuchElementException; -import org.apache.jackrabbit.core.journal.AbstractJournal; -import org.apache.jackrabbit.core.journal.AppendRecord; -import org.apache.jackrabbit.core.journal.InstanceRevision; -import org.apache.jackrabbit.core.journal.JournalException; -import org.apache.jackrabbit.core.journal.RecordIterator; import org.apache.jackrabbit.spi.commons.namespace.NamespaceResolver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,7 +144,7 @@ /** * {@inheritDoc} */ - protected RecordIterator getRecords(long startRevision) + public RecordIterator getRecords(long startRevision) throws JournalException { checkState(); @@ -161,6 +156,13 @@ } /** + * {@inheritDoc} + */ + public RecordIterator getRecords() throws JournalException { + return new MemoryRecordIterator(0, records.size()); + } + + /** * Set records. Used to share records between two journal implementations. * * @param records array list that should back up this memory journal Index: src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java =================================================================== --- src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java (revision 0) +++ src/test/java/org/apache/jackrabbit/core/observation/EventJournalTest.java (revision 0) @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.observation; + +import java.util.Set; +import java.util.HashSet; +import java.util.Arrays; + +import javax.jcr.RepositoryException; +import javax.jcr.Node; +import javax.jcr.Session; +import javax.jcr.observation.Event; + +import org.apache.jackrabbit.test.AbstractJCRTest; +import org.apache.jackrabbit.core.WorkspaceImpl; +import org.apache.jackrabbit.api.jsr283.observation.EventJournal; + +/** + * EventJournalTest performs EventJournal tests. + */ +public class EventJournalTest extends AbstractJCRTest { + + private static final int ALL_TYPES = Event.NODE_ADDED | Event.NODE_REMOVED | Event.PROPERTY_ADDED | Event.PROPERTY_CHANGED | Event.PROPERTY_REMOVED; + + private EventJournal journal; + + protected void setUp() throws Exception { + super.setUp(); + journal = getEventJournal(ALL_TYPES, "/", true, null, null); + } + + public void testSkipToNow() throws RepositoryException { + // skip everything + journal.skipTo(System.currentTimeMillis()); + assertFalse(journal.hasNext()); + } + + public void testSkipTo() throws Exception { + long time = System.currentTimeMillis(); + + // add some nodes + Node n1 = testRootNode.addNode(nodeName1); + Node n2 = testRootNode.addNode(nodeName2); + + // make sure some time passed otherwise we might + // skip this change as well. + while (time == System.currentTimeMillis()) { + Thread.sleep(1); + } + + // now save + superuser.save(); + + journal.skipTo(time); + // at least the two added nodes must be returned by the journal + checkJournal(new String[]{n1.getPath(), n2.getPath()}, new String[0]); + } + + public void testLiveJournal() throws RepositoryException { + journal.skipTo(System.currentTimeMillis()); + assertFalse(journal.hasNext()); + + testRootNode.addNode(nodeName1); + superuser.save(); + + assertTrue(journal.hasNext()); + } + + public void testWorkspaceSeparation() throws RepositoryException { + journal.skipTo(System.currentTimeMillis()); + assertFalse(journal.hasNext()); + + Session session = helper.getSuperuserSession(workspaceName); + try { + Node rootNode = session.getRootNode(); + if (rootNode.hasNode(nodeName1)) { + rootNode.getNode(nodeName1).remove(); + } else { + rootNode.addNode(nodeName1); + } + session.save(); + } finally { + session.logout(); + } + + assertFalse(journal.hasNext()); + } + + public void testEventType() throws RepositoryException { + Node n1 = testRootNode.addNode(nodeName1); + + journal = getEventJournal(Event.PROPERTY_ADDED, testRoot, true, null, null); + journal.skipTo(System.currentTimeMillis()); + + superuser.save(); + + checkJournal(new String[]{n1.getPath() + "/" + jcrPrimaryType}, + new String[]{n1.getPath()}); + } + + public void testPath() throws RepositoryException { + Node n1 = testRootNode.addNode(nodeName1); + Node n2 = n1.addNode(nodeName2); + + journal = getEventJournal(ALL_TYPES, n1.getPath(), true, null, null); + journal.skipTo(System.currentTimeMillis()); + + superuser.save(); + + checkJournal(new String[]{n2.getPath()}, new String[]{n1.getPath()}); + } + + public void testIsDeepTrue() throws RepositoryException { + Node n1 = testRootNode.addNode(nodeName1); + Node n2 = n1.addNode(nodeName2); + + journal = getEventJournal(ALL_TYPES, testRoot, true, null, null); + journal.skipTo(System.currentTimeMillis()); + + superuser.save(); + + checkJournal(new String[]{n1.getPath(), n2.getPath()}, new String[0]); + } + + public void testIsDeepFalse() throws RepositoryException { + Node n1 = testRootNode.addNode(nodeName1); + Node n2 = n1.addNode(nodeName2); + + journal = getEventJournal(ALL_TYPES, testRoot, false, null, null); + journal.skipTo(System.currentTimeMillis()); + + superuser.save(); + + checkJournal(new String[]{n1.getPath()}, new String[]{n2.getPath()}); + } + + public void testUUID() throws RepositoryException { + Node n1 = testRootNode.addNode(nodeName1); + if (!n1.isNodeType(mixReferenceable)) { + n1.addMixin(mixReferenceable); + } + superuser.save(); + + Node n2 = n1.addNode(nodeName2); + + journal = getEventJournal(ALL_TYPES, "/", true, new String[]{n1.getUUID()}, null); + journal.skipTo(System.currentTimeMillis()); + + superuser.save(); + + checkJournal(new String[]{n2.getPath()}, new String[0]); + } + + public void testNodeType() throws RepositoryException { + Node n1 = testRootNode.addNode(nodeName1, "nt:folder"); + Node n2 = n1.addNode(nodeName2, "nt:folder"); + + journal = getEventJournal(ALL_TYPES, testRoot, true, null, + new String[]{"nt:folder"}); + journal.skipTo(System.currentTimeMillis()); + + superuser.save(); + + checkJournal(new String[]{n2.getPath()}, new String[]{n1.getPath()}); + } + + //-------------------------------< internal >------------------------------- + + private EventJournal getEventJournal(int eventTypes, + String absPath, + boolean isDeep, + String[] uuid, + String[] nodeTypeName) + throws RepositoryException { + return ((WorkspaceImpl) superuser.getWorkspace()).getEventJournal( + eventTypes, absPath, isDeep, uuid, nodeTypeName); + } + + /** + * Checks the journal for events. + * + * @param allowed allowed paths for the returned events. + * @param denied denied paths for the returned events. + * @throws RepositoryException if an error occurs while reading the event + * journal. + */ + private void checkJournal(String[] allowed, String[] denied) throws RepositoryException { + Set allowedSet = new HashSet(Arrays.asList(allowed)); + Set deniedSet = new HashSet(Arrays.asList(denied)); + while (journal.hasNext()) { + String path = journal.nextEvent().getPath(); + allowedSet.remove(path); + if (deniedSet.contains(path)) { + fail(path + " must not be present in journal"); + } + } + assertTrue("Missing paths in journal: " + allowedSet, allowedSet.isEmpty()); + } +} Property changes on: src\test\java\org\apache\jackrabbit\core\observation\EventJournalTest.java ___________________________________________________________________ Added: svn:eol-style + native Index: src/test/java/org/apache/jackrabbit/core/observation/TestAll.java =================================================================== --- src/test/java/org/apache/jackrabbit/core/observation/TestAll.java (revision 712651) +++ src/test/java/org/apache/jackrabbit/core/observation/TestAll.java (working copy) @@ -40,6 +40,7 @@ suite.addTestSuite(MixinTest.class); suite.addTestSuite(VersionEventsTest.class); suite.addTestSuite(MoveInPlaceTest.class); + suite.addTestSuite(EventJournalTest.class); return suite; } Index: src/test/repository/repository.xml =================================================================== --- src/test/repository/repository.xml (revision 712651) +++ src/test/repository/repository.xml (working copy) @@ -123,4 +123,14 @@ + + + + + + + +