Index: oak-core/src/main/java/org/apache/jackrabbit/oak/core/SimpleCommitContext.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/core/SimpleCommitContext.java	(revision 1758607)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/core/SimpleCommitContext.java	(working copy)
@@ -19,6 +19,7 @@
 
 package org.apache.jackrabbit.oak.core;
 
+import java.io.Serializable;
 import java.util.Map;
 
 import com.google.common.collect.Maps;
@@ -26,7 +27,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-public class SimpleCommitContext implements CommitContext {
+public class SimpleCommitContext implements CommitContext, Serializable {
     private final Map<String, Object> attrs = Maps.newHashMap();
 
     @Override
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java	(revision 1758607)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java	(working copy)
@@ -111,6 +111,7 @@
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
+import org.apache.jackrabbit.oak.spi.state.NodeStateSerializer;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
@@ -123,7 +124,7 @@
  * Implementation of a NodeStore on {@link DocumentStore}.
  */
 public final class DocumentNodeStore
-        implements NodeStore, RevisionContext, Observable, Clusterable, NodeStateDiffer {
+        implements NodeStore, RevisionContext, Observable, Clusterable, NodeStateDiffer, NodeStateSerializer {
 
     private static final Logger LOG = LoggerFactory.getLogger(DocumentNodeStore.class);
 
@@ -1674,6 +1675,18 @@
         checkpoints.release(checkpoint);
         return true;
     }
+    
+    @Override
+    public String serialize(NodeState nodeState) {
+    	checkArgument(nodeState instanceof DocumentNodeState);
+    	DocumentNodeState documentNodeState = (DocumentNodeState) nodeState;
+    	return documentNodeState.getRootRevision().asString();
+    }
+    
+    @Override
+    public NodeState deserialize(String serializedNodeStateStr) {
+    	return DocumentNodeState.fromString(this, serializedNodeStateStr);
+    }
 
     //------------------------< RevisionContext >-------------------------------
 
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java	(revision 1758607)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreService.java	(working copy)
@@ -81,6 +81,7 @@
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.spi.blob.stats.BlobStoreStatsMBean;
 import org.apache.jackrabbit.oak.spi.state.Clusterable;
+import org.apache.jackrabbit.oak.spi.state.NodeStateSerializer;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.spi.state.RevisionGC;
 import org.apache.jackrabbit.oak.spi.state.RevisionGCMBean;
@@ -595,7 +596,8 @@
         nodeStoreReg = context.getBundleContext().registerService(
             new String[]{
                  NodeStore.class.getName(), 
-                 DocumentNodeStore.class.getName(), 
+                 DocumentNodeStore.class.getName(),
+                 NodeStateSerializer.class.getName(),
                  Clusterable.class.getName()
             }, 
             nodeStore, props);
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java	(revision 1758607)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java	(working copy)
@@ -19,11 +19,11 @@
 
 package org.apache.jackrabbit.oak.spi.commit;
 
+import static com.google.common.collect.Queues.newArrayBlockingQueue;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.size;
-import static com.google.common.collect.Queues.newArrayBlockingQueue;
 
 import java.io.Closeable;
 import java.lang.Thread.UncaughtExceptionHandler;
@@ -34,12 +34,13 @@
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import com.google.common.base.Predicate;
 import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Predicate;
+
 /**
  * An observer that uses a change queue and a background thread to forward
  * content changes to another observer. The mechanism is designed so that
@@ -83,7 +84,7 @@
     /**
      * The max queue length used for this observer's queue
      */
-    private final int maxQueueLength;
+//    private final int maxQueueLength;
 
     /**
      * Whether external events should be collapsed even if queue isn't full yet.
@@ -91,16 +92,6 @@
     private final boolean alwaysCollapseExternalEvents =
             Boolean.parseBoolean(System.getProperty("oak.observation.alwaysCollapseExternal", "false"));
 
-    private static class ContentChange {
-        private final NodeState root;
-        private final CommitInfo info;
-        private final long created = System.currentTimeMillis();
-        ContentChange(NodeState root, CommitInfo info) {
-            this.root = root;
-            this.info = info;
-        }
-    }
-
     /**
      * The content change that was last added to the queue.
      * Used to compact external changes.
@@ -152,21 +143,46 @@
      */
     private volatile boolean stopped;
 
+    /**
+     * new style of creating a BackgroundObserver - with an EventQueueFactory
+     */
     public BackgroundObserver(
             @Nonnull Observer observer,
             @Nonnull Executor executor,
+            @Nonnull EventQueueFactory eventQueueService,
+            @Nonnull UncaughtExceptionHandler exceptionHandler) {
+        this.observer = checkNotNull(observer);
+        this.executor = checkNotNull(executor);
+        this.exceptionHandler = checkNotNull(exceptionHandler);
+        this.queue = eventQueueService.newQueue();
+    }
+
+    public BackgroundObserver(
+            @Nonnull Observer observer,
+            @Nonnull Executor executor,
             int queueLength,
             @Nonnull UncaughtExceptionHandler exceptionHandler) {
         this.observer = checkNotNull(observer);
         this.executor = checkNotNull(executor);
         this.exceptionHandler = checkNotNull(exceptionHandler);
-        this.maxQueueLength = queueLength;
-        this.queue = newArrayBlockingQueue(maxQueueLength);
+        this.queue = newArrayBlockingQueue(queueLength);
     }
 
     public BackgroundObserver(
             @Nonnull final Observer observer,
             @Nonnull Executor executor,
+            @Nonnull EventQueueFactory eventQueueService) {
+        this(observer, executor, eventQueueService, new UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                getLogger(observer).error("Uncaught exception in " + observer, e);
+            }
+        });
+    }
+
+    public BackgroundObserver(
+            @Nonnull final Observer observer,
+            @Nonnull Executor executor,
             int queueLength) {
         this(observer, executor, queueLength, new UncaughtExceptionHandler() {
             @Override
@@ -201,7 +217,7 @@
      * @return  The max queue length used for this observer's queue
      */
     public int getMaxQueueLength() {
-        return maxQueueLength;
+    	return queue.size() + queue.remainingCapacity();
     }
 
     /**
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java	(revision 1758607)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfo.java	(working copy)
@@ -49,7 +49,7 @@
 
     private final String userId;
 
-    private final long date = System.currentTimeMillis();
+    private final long date;
 
     private final Map<String, Object> info;
 
@@ -61,9 +61,14 @@
      */
     public CommitInfo(
             @Nonnull String sessionId, @Nullable String userId) {
-        this(sessionId, userId, Collections.<String, Object>emptyMap());
+        this(sessionId, userId, Collections.<String, Object>emptyMap(), System.currentTimeMillis());
     }
 
+    public CommitInfo(
+            @Nonnull String sessionId, @Nullable String userId, long date) {
+        this(sessionId, userId, Collections.<String, Object>emptyMap(), date);
+    }
+
     /**
      * Creates a commit info for the given session and user and info map.
      *
@@ -72,9 +77,14 @@
      * @param info info map
      */
     public CommitInfo(@Nonnull String sessionId, @Nullable String userId, Map<String, Object> info) {
+        this(sessionId, userId, info, System.currentTimeMillis());
+    }
+
+    public CommitInfo(@Nonnull String sessionId, @Nullable String userId, Map<String, Object> info, long date) {
         this.sessionId = checkNotNull(sessionId);
         this.userId = (userId == null) ? OAK_UNKNOWN : userId;
         this.info = checkNotNull(info);
+        this.date = date;
     }
 
     /**
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfoCodec.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfoCodec.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfoCodec.java	(working copy)
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.jackrabbit.oak.api.Blob;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class for PersistedBlockingQueue which takes care of the
+ * encoding/decoding of CommitInfo into oak's nodes and properties.
+ */
+public class CommitInfoCodec {
+
+    private static final Logger log = LoggerFactory.getLogger(CommitInfoCodec.class);
+
+    private static final String AUTHENTICATION_TOKEN_COMMIT_MARKER = "org.apache.jackrabbit.oak.security.authentication.token.CommitMarker";
+
+    private static final Set<String> IGNORED_PROPERTIES = new HashSet<String>(
+            Arrays.asList(AUTHENTICATION_TOKEN_COMMIT_MARKER));
+
+    private static final String PROPERTY_NAME_SERIALIZED_ROOT = "root";
+    private static final String PROPERTY_NAME_SERIALIZED_VALUE = "serializedValue";
+
+    private static final String CHILD_NODE_COMMIT_INFO_INFO_MAP = "infoMap";
+    private static final String PROPERTY_NAME_COMMIT_INFO_DATE = "date";
+    private static final String PROPERTY_NAME_COMMIT_INFO_USER_ID = "userId";
+    private static final String PROPERTY_NAME_COMMIT_INFO_SESSION_ID = "sessionId";
+
+    static boolean writeCommitInfo(ContentChange contentChange, NodeBuilder nodeBuilder,
+            NodeStateSerializer serializer) {
+        if (contentChange.info != null) {
+            final CommitInfo ci = contentChange.info;
+            nodeBuilder.setProperty(PROPERTY_NAME_COMMIT_INFO_SESSION_ID, ci.getSessionId());
+            nodeBuilder.setProperty(PROPERTY_NAME_COMMIT_INFO_USER_ID, ci.getUserId());
+            nodeBuilder.setProperty(PROPERTY_NAME_COMMIT_INFO_DATE, ci.getDate());
+            final NodeBuilder commitInfoInfoMap = nodeBuilder.child(CHILD_NODE_COMMIT_INFO_INFO_MAP);
+            final Set<Entry<String, Object>> entries = ci.getInfo().entrySet();
+            for (Entry<String, Object> infoMapEntry : entries) {
+                final String infoName = infoMapEntry.getKey();
+                if (IGNORED_PROPERTIES.contains(infoName)) {
+                    // TODO: make this a configurable list here instead of just
+                    // hardcoding
+                    continue;
+                }
+                final Object infoValue = infoMapEntry.getValue();
+                if (infoValue instanceof String) {
+                    commitInfoInfoMap.setProperty(infoName, (String) infoValue);
+                } else if (infoValue instanceof Long) {
+                    commitInfoInfoMap.setProperty(infoName, (long) infoValue);
+                } else if (infoValue instanceof Integer || infoValue instanceof Short) {
+                    // short is converted to a long
+                    commitInfoInfoMap.setProperty(infoName, (int) infoValue);
+                } else if (infoValue instanceof Double || infoValue instanceof Float) {
+                    // float is converted to a double
+                    commitInfoInfoMap.setProperty(infoName, (double) infoValue);
+                } else if (infoValue instanceof Boolean) {
+                    commitInfoInfoMap.setProperty(infoName, (boolean) infoValue);
+                } else if (infoValue instanceof byte[]) {
+                    commitInfoInfoMap.setProperty(infoName, (byte[]) infoValue);
+                } else if (infoValue instanceof Blob) {
+                    commitInfoInfoMap.setProperty(infoName, (Blob) infoValue);
+                } else if (infoValue instanceof BigDecimal) {
+                    commitInfoInfoMap.setProperty(infoName, (BigDecimal) infoValue);
+                } else if (infoValue instanceof Serializable) {
+                    // note that Externalizable is not treated differently here
+                    // but falls also under Serializable
+                    try {
+                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                        ObjectOutputStream oos = new ObjectOutputStream(baos);
+                        oos.writeObject(infoValue);
+                        oos.flush();
+                        baos.flush();
+                        commitInfoInfoMap.child(infoName).setProperty(PROPERTY_NAME_SERIALIZED_VALUE,
+                                baos.toByteArray());
+                    } catch (Exception e) {
+                        log.error("writeCommitInfo: could not serialize: " + infoName + ", e=" + e, e);
+                    }
+                } else {
+                    log.error("writeCommitInfo: unsupported CommitInfo info-map type: " + infoValue.getClass());
+                }
+            }
+        }
+        String serializedRoot = serializer.serialize(contentChange.root);
+        if (serializedRoot == null) {
+            // serialize never returns null
+            log.error("writeCommitInfo: could not serialize a contentChange: " + contentChange);
+            return false;
+        }
+
+        nodeBuilder.setProperty(PROPERTY_NAME_SERIALIZED_ROOT, serializedRoot);
+        return true;
+    }
+
+    static ContentChange readCommitInfo(final NodeState nodeState, final NodeStateSerializer serializer) {
+        Map<String, Object> infoMap = null;
+        CommitInfo ci = CommitInfo.EMPTY;
+        if (nodeState.hasProperty(PROPERTY_NAME_COMMIT_INFO_SESSION_ID)) {
+            final String sessionId = nodeState.getProperty(PROPERTY_NAME_COMMIT_INFO_SESSION_ID).getValue(Type.STRING);
+            final String userId = nodeState.getProperty(PROPERTY_NAME_COMMIT_INFO_USER_ID).getValue(Type.STRING);
+            final long date = nodeState.getProperty(PROPERTY_NAME_COMMIT_INFO_DATE).getValue(Type.LONG);
+            if (nodeState.hasChildNode(CHILD_NODE_COMMIT_INFO_INFO_MAP)) {
+                NodeState infoNode = nodeState.getChildNode(CHILD_NODE_COMMIT_INFO_INFO_MAP);
+                infoMap = new HashMap<String, Object>();
+                Iterator<? extends PropertyState> props = infoNode.getProperties().iterator();
+                while (props.hasNext()) {
+                    PropertyState prop = props.next();
+                    String infoName = prop.getName();
+                    if (prop.getType() == Type.STRING) {
+                        infoMap.put(infoName, prop.getValue(Type.STRING));
+                    } else if (prop.getType() == Type.BOOLEAN) {
+                        infoMap.put(infoName, prop.getValue(Type.BOOLEAN));
+                    } else if (prop.getType() == Type.BINARY) {
+                        infoMap.put(infoName, prop.getValue(Type.BINARY));
+                    } else if (prop.getType() == Type.LONG) {
+                        infoMap.put(infoName, prop.getValue(Type.LONG));
+                    } else if (prop.getType() == Type.DOUBLE) {
+                        infoMap.put(infoName, prop.getValue(Type.DOUBLE));
+                    } else {
+                        log.warn("readCommitInfo: unsupported info-map type: " + prop.getType());
+                    }
+                }
+                final Iterator<? extends ChildNodeEntry> childIt = infoNode.getChildNodeEntries().iterator();
+                while (childIt.hasNext()) {
+                    final ChildNodeEntry child = childIt.next();
+                    try {
+                        Blob blob = child.getNodeState().getProperty(PROPERTY_NAME_SERIALIZED_VALUE)
+                                .getValue(Type.BINARY);
+                        ObjectInputStream ois = new ObjectInputStream(blob.getNewStream());
+                        Object obj = ois.readObject();
+                        infoMap.put(child.getName(), obj);
+                    } catch (Exception e) {
+                        log.error("readCommitInfo: cannot read serialized object " + child.getName() + ", e=" + e, e);
+                    }
+                }
+            }
+            if (infoMap == null) {
+                ci = new CommitInfo(sessionId, userId, date);
+            } else {
+                ci = new CommitInfo(sessionId, userId, infoMap, date);
+            }
+        }
+
+        PropertyState rootProperty = nodeState.getProperty(PROPERTY_NAME_SERIALIZED_ROOT);
+        NodeState changeRoot;
+        if (rootProperty == null) {
+            log.warn("readCommitInfo: property '" + PROPERTY_NAME_SERIALIZED_ROOT
+                    + "' is not set, cannot read observation event");
+        }
+        String serializedRoot = rootProperty.getValue(Type.STRING);
+        try {
+            changeRoot = serializer.deserialize(serializedRoot);
+        } catch (Exception e) {
+            log.warn("readCommitInfo: could not read serialized root: " + serializedRoot + ", e=" + e, e);
+            return null;
+        }
+        ContentChange change = new ContentChange(changeRoot, ci);
+        return change;
+    }
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/CommitInfoCodec.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ContentChange.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ContentChange.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ContentChange.java	(working copy)
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+
+class ContentChange {
+
+    final NodeState root;
+    final CommitInfo info;
+    // TODO: this could be taken from CommitInfo.getDate():
+    final long created = System.currentTimeMillis();
+
+    ContentChange(NodeState root, CommitInfo info) {
+        this.root = root;
+        this.info = info;
+    }
+}
\ No newline at end of file

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/ContentChange.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/EventQueueFactory.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/EventQueueFactory.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/EventQueueFactory.java	(working copy)
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Capable of creating a new BlockingQueue.
+ * <p>
+ * Typical implementation would be an in-memory limited queue or a persistent
+ * unlimited queue.
+ */
+public interface EventQueueFactory {
+
+    BlockingQueue<ContentChange> newQueue();
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/EventQueueFactory.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/InMemoryLimitedEventQueueFactory.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/InMemoryLimitedEventQueueFactory.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/InMemoryLimitedEventQueueFactory.java	(working copy)
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import static com.google.common.collect.Queues.newArrayBlockingQueue;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Service;
+
+@Component(policy = ConfigurationPolicy.REQUIRE, metatype = true, label = "Apache Jackrabbit Oak In-Memory Event-Queue Factory", description = "In-Memory, limited queue factory (which is limited to avoid OOME)")
+@Service
+public class InMemoryLimitedEventQueueFactory implements EventQueueFactory {
+
+    private int maxQueueLength;
+
+    /** OSGi constructor **/
+    public InMemoryLimitedEventQueueFactory() {
+        maxQueueLength = 50000;
+    }
+
+    /** explicit constructor **/
+    public InMemoryLimitedEventQueueFactory(int maxQueueLength) {
+        this.maxQueueLength = maxQueueLength;
+    }
+
+    @Override
+    public BlockingQueue<ContentChange> newQueue() {
+        return newArrayBlockingQueue(maxQueueLength);
+    }
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/InMemoryLimitedEventQueueFactory.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/PersistedBlockingQueue.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/PersistedBlockingQueue.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/PersistedBlockingQueue.java	(working copy)
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.oak.plugins.commit.ConflictHook;
+import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
+import org.apache.jackrabbit.oak.plugins.commit.JcrConflictHandler;
+import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateSerializer;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * WORK IN PROGRESS: This is an early stage of a persisting blocking queue which
+ * has two (meant-to-be-small) buffers at both ends: a tail where entries are
+ * added, and a head where entries are taken from. Whenever the tail overfills
+ * or the head underfills, the tail is flushed to the provided store. Whenever
+ * the head underfills, it first flushes the tail (as just mentioned) and then
+ * reads from store. UNOPTIMIZED: the idea is that the store isn't used when
+ * possible as long as things fit into memory. But this variant just show-cases
+ * the usage of the store, so always goes via store.
+ */
+class PersistedBlockingQueue implements BlockingQueue<ContentChange> {
+
+    private static final String PROPERTY_NAME_ADD_CNT = "addCnt";
+    private static final String PROPERTY_NAME_FLUSH_CNT = "flushCnt";
+    private static final String PROPERTY_NAME_REMOVE_CNT = "removeCnt";
+    private static final String PROPERTY_NAME_READ_CNT = "readCnt";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final static int IN_MEMORY_HEAD_BUFFER_SIZE = 100;
+    private final static int IN_MEMORY_TAIL_BUFFER_SIZE = 100;
+
+    private final List<ContentChange> headBuffer = new LinkedList<ContentChange>();
+    private final List<ContentChange> tailBuffer = new LinkedList<ContentChange>();
+
+    private int size;
+    private int modCnt;
+
+    private final NodeStore observationStore;
+    private final NodeStateSerializer nodeStateSerializer;
+
+    private final String qid = UUID.randomUUID().toString();
+    private long qidx = 0;
+    private int addCnt = 0;
+    private int flushCnt = 0;
+    private int removeCnt = 0;
+    private int readCnt = 0;
+    private final String[] obsQueuePathElements = { "oak", "observation-queues", qid };
+
+    public PersistedBlockingQueue(NodeStore observationStore, NodeStateSerializer nodeStateSerializer) {
+        this.observationStore = observationStore;
+        this.nodeStateSerializer = nodeStateSerializer;
+        init();
+    }
+
+    private NodeBuilder createChildrenHierarchy(NodeBuilder parent, String... children) {
+        for (String aChild : children) {
+            parent = parent.child(aChild);
+        }
+        return parent;
+    }
+
+    private void init() {
+        while (true) {
+            try {
+                final NodeBuilder rootBuilder = observationStore.getRoot().builder();
+                createChildrenHierarchy(rootBuilder, obsQueuePathElements);
+                CommitHook hook = new CompositeHook(new ConflictHook(JcrConflictHandler.createJcrConflictHandler()),
+                        new EditorHook(new ConflictValidatorProvider()));
+                observationStore.merge(rootBuilder, hook, CommitInfo.EMPTY);
+                break;
+            } catch (Exception e) {
+                log.error("flushTailBuffer: concurrent creation of /oak/observation-queues/" + qid
+                        + " - retrying in 1sec");
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e1) {
+                    log.warn("flushTailBuffer: got interrupted: " + e1, e1);
+                }
+                continue;
+            }
+        }
+    }
+
+    private NodeBuilder getChild(NodeBuilder parent, String... children) {
+        NodeBuilder childBuilder = parent;
+        for (String aChild : children) {
+            if (!childBuilder.hasChildNode(aChild)) {
+                return null;
+            }
+            childBuilder = childBuilder.child(aChild);
+        }
+        return childBuilder;
+    }
+
+    private void refillHeadBuffer() {
+        if (tailBuffer.size() > 0) {
+            flushTailBuffer();
+        }
+        if (headBuffer.size() > 0) {
+            return;
+        }
+        final NodeState rootRoot = observationStore.getRoot();
+        NodeBuilder rootBuilder = rootRoot.builder();
+        NodeBuilder queueBuilder = getChild(rootBuilder, obsQueuePathElements);
+        if (queueBuilder == null) {
+            log.error("refillHeadBuffer: could not read /oak/observation-queues/" + qid);
+            return;
+        }
+        queueBuilder.setProperty(PROPERTY_NAME_READ_CNT, ++readCnt);
+        // otherwise yes we have something
+        Iterator<? extends ChildNodeEntry> it = queueBuilder.getNodeState().getChildNodeEntries().iterator();
+        int cnt = 0;
+        List<ContentChange> readList = new LinkedList<ContentChange>();
+        while (it.hasNext()) {
+            final ChildNodeEntry entry = it.next();
+            final NodeState nodeState = entry.getNodeState();
+            final String entryName = entry.getName();
+            ContentChange change = CommitInfoCodec.readCommitInfo(nodeState, nodeStateSerializer);
+            // remove entry irrespective of whether reading it was successful or
+            // not
+            // TODO: perhaps we should reconsider this and leave the entry in
+            // the queue
+            // but then again, how would a cleanup mechanism look like it this
+            // is a reoccurring issue
+            queueBuilder.child(entryName).remove();
+            removeCnt++;
+            if (change != null) {
+                readList.add(change);
+                if (++cnt >= IN_MEMORY_HEAD_BUFFER_SIZE) {
+                    break;
+                }
+            }
+        }
+        queueBuilder.setProperty(PROPERTY_NAME_REMOVE_CNT, removeCnt);
+        try {
+            CommitHook hook = new CompositeHook(new ConflictHook(JcrConflictHandler.createJcrConflictHandler()),
+                    new EditorHook(new ConflictValidatorProvider()));
+            observationStore.merge(rootBuilder, hook, CommitInfo.EMPTY);
+        } catch (Exception e) {
+            log.error("refillHeadBuffer: could not merge deletion of nodes: " + e, e);
+            return;
+        }
+        headBuffer.addAll(readList);
+    }
+
+    private void flushTailBuffer() {
+        while (true) {
+            try {
+                final NodeBuilder rootBuilder = observationStore.getRoot().builder();
+                final NodeBuilder queueNode = getChild(rootBuilder, obsQueuePathElements);
+                queueNode.setProperty(PROPERTY_NAME_FLUSH_CNT, ++flushCnt);
+                for (ContentChange contentChange : tailBuffer) {
+                    NodeBuilder entry = queueNode.child(String.valueOf(qidx++));
+                    CommitInfoCodec.writeCommitInfo(contentChange, entry, nodeStateSerializer);
+                    addCnt++;
+                }
+                queueNode.setProperty(PROPERTY_NAME_ADD_CNT, addCnt);
+                CommitHook hook = new CompositeHook(new ConflictHook(JcrConflictHandler.createJcrConflictHandler()),
+                        new EditorHook(new ConflictValidatorProvider()));
+                observationStore.merge(rootBuilder, hook, CommitInfo.EMPTY);
+                break;
+            } catch (Exception e) {
+                log.error("flushTailBuffer: unexpected exception when flushing buffer, retrying in 1sec, e=" + e, e);
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e1) {
+                    log.warn("flushTailBuffer: got interrupted: " + e1, e1);
+                }
+                continue;
+            }
+        }
+        tailBuffer.clear();
+    }
+
+    @Override
+    public synchronized ContentChange remove() {
+        if (size == 0) {
+            throw new NoSuchElementException();
+        }
+        return poll();
+    }
+
+    @Override
+    public synchronized ContentChange poll() {
+        if (size == 0) {
+            return null;
+        }
+        if (headBuffer.isEmpty()) {
+            refillHeadBuffer();
+        }
+        size--;
+        return headBuffer.remove(0);
+    }
+
+    @Override
+    public synchronized ContentChange element() {
+        if (size == 0) {
+            throw new NoSuchElementException();
+        }
+        return peek();
+    }
+
+    @Override
+    public synchronized ContentChange peek() {
+        if (size == 0) {
+            return null;
+        }
+        if (headBuffer.isEmpty()) {
+            refillHeadBuffer();
+        }
+        return headBuffer.get(0);
+    }
+
+    @Override
+    public synchronized int size() {
+        return size;
+    }
+
+    @Override
+    public synchronized boolean isEmpty() {
+        return size == 0;
+    }
+
+    @Override
+    public synchronized Iterator<ContentChange> iterator() {
+        return new Iterator<ContentChange>() {
+
+            int idx = 0;
+            int itStartModCnt = modCnt;
+
+            @Override
+            public boolean hasNext() {
+                synchronized (PersistedBlockingQueue.this) {
+                    if (modCnt != itStartModCnt) {
+                        throw new ConcurrentModificationException();
+                    }
+                    return idx < size;
+                }
+            }
+
+            @Override
+            public ContentChange next() {
+                synchronized (PersistedBlockingQueue.this) {
+                    if (modCnt != itStartModCnt) {
+                        throw new ConcurrentModificationException();
+                    }
+                    if (idx < headBuffer.size()) {
+                        return headBuffer.get(idx++);
+                    } else {
+                        return tailBuffer.get((idx++) - headBuffer.size());
+                    }
+                }
+            }
+
+            @Override
+            public void remove() {
+                synchronized (PersistedBlockingQueue.this) {
+                    if (modCnt != itStartModCnt) {
+                        throw new ConcurrentModificationException();
+                    }
+                    if (idx == headBuffer.size()) {
+                        headBuffer.remove(idx - 1);
+                    } else {
+                        tailBuffer.remove(idx - 1 - headBuffer.size());
+                    }
+                }
+            }
+        };
+    }
+
+    @Override
+    public Object[] toArray() {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends ContentChange> c) {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c) {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public void clear() {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public synchronized boolean add(ContentChange e) {
+        try {
+            put(e);
+        } catch (InterruptedException e1) {
+            log.error("add: got interrupted adding: " + e1, e1);
+            throw new IllegalStateException(e1);
+        }
+        return true;
+    }
+
+    @Override
+    public synchronized boolean offer(ContentChange e) {
+        try {
+            put(e);
+        } catch (InterruptedException e1) {
+            log.error("add: got interrupted adding: " + e1, e1);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public synchronized void put(ContentChange e) throws InterruptedException {
+        // if buffer was previously at limit, then flush it first
+        if (tailBuffer.size() >= IN_MEMORY_TAIL_BUFFER_SIZE) {
+            flushTailBuffer();
+        }
+        tailBuffer.add(e);
+        size++;
+        // if buffer is now - after adding - at limit, then flush now
+        if (tailBuffer.size() > IN_MEMORY_TAIL_BUFFER_SIZE) {
+            flushTailBuffer();
+        }
+    }
+
+    @Override
+    public boolean offer(ContentChange e, long timeout, TimeUnit unit) throws InterruptedException {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public synchronized ContentChange take() throws InterruptedException {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public synchronized ContentChange poll(long timeout, TimeUnit unit) throws InterruptedException {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public int remainingCapacity() {
+        return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public int drainTo(Collection<? super ContentChange> c) {
+        throw new IllegalStateException("nyi");
+    }
+
+    @Override
+    public int drainTo(Collection<? super ContentChange> c, int maxElements) {
+        throw new IllegalStateException("nyi");
+    }
+
+}
\ No newline at end of file

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/PersistedBlockingQueue.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/PersistedEventQueueFactory.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/PersistedEventQueueFactory.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/PersistedEventQueueFactory.java	(working copy)
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.spi.commit;
+
+import static com.google.common.collect.Queues.newArrayBlockingQueue;
+import static org.apache.jackrabbit.oak.osgi.OsgiUtil.lookupConfigurationThenFramework;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.jackrabbit.oak.spi.state.NodeStateSerializer;
+import org.apache.jackrabbit.oak.spi.state.NodeStoreProvider;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(policy = ConfigurationPolicy.REQUIRE, metatype = true, label = "Apache Jackrabbit Oak Persisted Event-Queue Factory", description = "bla bla")
+@Service
+public class PersistedEventQueueFactory implements EventQueueFactory {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(target = "(role=observation)", cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private NodeStoreProvider observationStoreProvider;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private NodeStateSerializer nodeStateSerializer;
+
+    private ComponentContext context;
+
+    public PersistedEventQueueFactory() {
+
+    }
+
+    @Activate
+    protected void activate(ComponentContext context) throws Exception {
+        this.context = context;
+    }
+
+    @Override
+    public BlockingQueue<ContentChange> newQueue() {
+        if (observationStoreProvider == null) {
+            return newArrayBlockingQueue(45434);
+        } else {
+            return new PersistedBlockingQueue(observationStoreProvider.getNodeStore(), nodeStateSerializer);
+        }
+    }
+
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/PersistedEventQueueFactory.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/NodeStateSerializer.java
===================================================================
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/NodeStateSerializer.java	(revision 0)
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/NodeStateSerializer.java	(working copy)
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.spi.state;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+
+/**
+ * 'Add-on' interface to a NodeStore which adds the capability
+ * of serializing / deserializing NodeStates. 
+ * <p>
+ * The user has to take care that a later deserialized NodeState
+ * is 'possible' and doesn't refer to a garbage collected
+ * item.
+ */
+public interface NodeStateSerializer {
+
+    /**
+     * Best-effort: serializes the provided NodeState into a string representation
+     * (typically some sort of identifier) that can later be used to deserialize 
+     * back into a NodeState.
+     * <p>
+     * If NodeStore implementations wish not to support this, they can simply 
+     * not implement NodeStateSerializer. The support of serialize/deserialize
+     * doesn't have any implications on the availability (ie not-garbage-collected)
+     * of NodeStates and must be assured by the user of this interface by other means.
+     * @param nodeState
+     * @return
+     */
+	@Nonnull
+    String serialize(@Nonnull NodeState nodeState);
+    
+	/**
+     * @return a NodeState that is equal to the one previously
+     * 		   passed to serialize - or {@code null} if it 
+     *         is no longer available
+     */
+    @CheckForNull
+    NodeState deserialize(@Nonnull String serializedNodeStateStr);
+    
+}

Property changes on: oak-core/src/main/java/org/apache/jackrabbit/oak/spi/state/NodeStateSerializer.java
___________________________________________________________________
Added: svn:eol-style
## -0,0 +1 ##
+native
\ No newline at end of property
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java	(revision 1758575)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/Jcr.java	(working copy)
@@ -53,6 +53,7 @@
 import org.apache.jackrabbit.oak.spi.commit.CompositeConflictHandler;
 import org.apache.jackrabbit.oak.spi.commit.Editor;
 import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
+import org.apache.jackrabbit.oak.spi.commit.EventQueueFactory;
 import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.commit.PartialConflictHandler;
 import org.apache.jackrabbit.oak.spi.lifecycle.RepositoryInitializer;
@@ -105,6 +106,8 @@
     
     private Clusterable clusterable;
 
+	private EventQueueFactory eventQueueFactory;
+
     public Jcr(Oak oak, boolean initialize) {
         this.oak = oak;
 
@@ -261,6 +264,13 @@
         this.commitRateLimiter = checkNotNull(commitRateLimiter);
         return this;
     }
+    
+    @Nonnull
+    public Jcr with(@Nonnull EventQueueFactory eventQueueFactory) {
+        ensureRepositoryIsNotCreated();
+        this.eventQueueFactory = checkNotNull(eventQueueFactory);
+        return this;
+    }
 
     @Nonnull
     public Jcr with(@Nonnull QueryEngineSettings qs) {
@@ -383,7 +393,8 @@
                     securityProvider,
                     observationQueueLength,
                     commitRateLimiter,
-                    fastQueryResultSize);
+                    fastQueryResultSize,
+            		eventQueueFactory);
         }
         return repository;
     }
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java	(revision 1758575)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java	(working copy)
@@ -37,9 +37,6 @@
 import javax.jcr.observation.EventIterator;
 import javax.jcr.observation.EventListener;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Monitor;
-import com.google.common.util.concurrent.Monitor.Guard;
 import org.apache.jackrabbit.api.jmx.EventListenerMBean;
 import org.apache.jackrabbit.commons.observation.ListenerTracker;
 import org.apache.jackrabbit.oak.api.ContentSession;
@@ -52,20 +49,26 @@
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
 import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EventQueueFactory;
+import org.apache.jackrabbit.oak.spi.commit.InMemoryLimitedEventQueueFactory;
 import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
+import org.apache.jackrabbit.oak.stats.MeterStats;
 import org.apache.jackrabbit.oak.stats.StatisticManager;
-import org.apache.jackrabbit.oak.stats.MeterStats;
 import org.apache.jackrabbit.oak.stats.TimerStats;
 import org.apache.jackrabbit.oak.util.PerfLogger;
 import org.apache.jackrabbit.stats.TimeSeriesMax;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Monitor;
+import com.google.common.util.concurrent.Monitor.Guard;
+
 /**
  * A {@code ChangeProcessor} generates observation {@link javax.jcr.observation.Event}s
  * based on a {@link FilterProvider filter} and delivers them to an {@link EventListener}.
@@ -146,6 +149,8 @@
     private CompositeRegistration registration;
 
     private volatile NodeState previousRoot;
+    
+	private final EventQueueFactory eventQueueFactory;
 
     public ChangeProcessor(
             ContentSession contentSession,
@@ -154,7 +159,8 @@
             FilterProvider filter,
             StatisticManager statisticManager,
             int queueLength,
-            CommitRateLimiter commitRateLimiter) {
+            CommitRateLimiter commitRateLimiter,
+            EventQueueFactory eventQueueFactory) {
         this.contentSession = contentSession;
         this.namePathMapper = namePathMapper;
         this.tracker = tracker;
@@ -165,6 +171,11 @@
         this.maxQueueLength = statisticManager.maxQueLengthRecorder();
         this.queueLength = queueLength;
         this.commitRateLimiter = commitRateLimiter;
+        if (eventQueueFactory == null) {
+        	this.eventQueueFactory = new InMemoryLimitedEventQueueFactory(queueLength);
+        } else {
+        	this.eventQueueFactory = eventQueueFactory;
+        }
     }
 
     /**
@@ -221,7 +232,7 @@
     }
 
     private BackgroundObserver createObserver(final WhiteboardExecutor executor) {
-        return new BackgroundObserver(this, executor, queueLength) {
+        return new BackgroundObserver(this, executor, eventQueueFactory) {
             private volatile long delay;
             private volatile boolean blocking;
 
@@ -240,7 +251,7 @@
                         LOG.warn("Revision queue is full. Further revisions will be compacted.");
                     }
                     blocking = true;
-                } else {
+                } else if (queueLength != -1) { // OAK-4581 in persisted event queue case queueLength is -1 == unlimited
                     double fillRatio = (double) queueSize / queueLength;
                     if (fillRatio > DELAY_THRESHOLD) {
                         if (commitRateLimiter != null) {
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java	(revision 1758575)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ObservationManagerImpl.java	(working copy)
@@ -59,6 +59,7 @@
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
 import org.apache.jackrabbit.oak.plugins.observation.filter.PermissionProviderFactory;
 import org.apache.jackrabbit.oak.plugins.observation.filter.Selectors;
+import org.apache.jackrabbit.oak.spi.commit.EventQueueFactory;
 import org.apache.jackrabbit.oak.spi.commit.Observable;
 import org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration;
 import org.apache.jackrabbit.oak.spi.security.authorization.permission.PermissionProvider;
@@ -91,6 +92,7 @@
     private final int queueLength;
     private final CommitRateLimiter commitRateLimiter;
     private final PermissionProviderFactory permissionProviderFactory;
+	private final EventQueueFactory eventQueueFactory;
 
     /**
      * Create a new instance based on a {@link ContentSession} that needs to implement
@@ -104,7 +106,8 @@
      */
     public ObservationManagerImpl(
             SessionContext sessionContext, ReadOnlyNodeTypeManager nodeTypeManager,
-            Whiteboard whiteboard, int queueLength, CommitRateLimiter commitRateLimiter) {
+            Whiteboard whiteboard, int queueLength, CommitRateLimiter commitRateLimiter,
+            EventQueueFactory eventQueueFactory) {
 
         this.sessionDelegate = sessionContext.getSessionDelegate();
         this.authorizationConfig = sessionContext.getSecurityProvider().getConfiguration(AuthorizationConfiguration.class);
@@ -114,6 +117,7 @@
         this.statisticManager = sessionContext.getStatisticManager();
         this.queueLength = queueLength;
         this.commitRateLimiter = commitRateLimiter;
+        this.eventQueueFactory = eventQueueFactory;
         this.permissionProviderFactory = new PermissionProviderFactory() {
             Set<Principal> principals = sessionDelegate.getAuthInfo().getPrincipals();
             @Nonnull
@@ -150,7 +154,7 @@
             // session. See OAK-1368.
             processor = new ChangeProcessor(sessionDelegate.getContentSession(), namePathMapper,
                     tracker, filterProvider, statisticManager, queueLength,
-                    commitRateLimiter);
+                    commitRateLimiter, eventQueueFactory);
             processors.put(listener, processor);
             processor.start(whiteboard);
         } else {
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java	(revision 1758575)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/osgi/OsgiRepository.java	(working copy)
@@ -24,6 +24,7 @@
 import org.apache.jackrabbit.oak.api.ContentRepository;
 import org.apache.jackrabbit.oak.jcr.repository.RepositoryImpl;
 import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
+import org.apache.jackrabbit.oak.spi.commit.InMemoryLimitedEventQueueFactory;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 
@@ -41,7 +42,9 @@
                           CommitRateLimiter commitRateLimiter,
                           boolean fastQueryResultSize) {
         super(repository, whiteboard, securityProvider, observationQueueLength, 
-                commitRateLimiter, fastQueryResultSize);
+                commitRateLimiter, fastQueryResultSize,
+                // OAK-4581 : TODO: make this configurable/parametrizable
+                new InMemoryLimitedEventQueueFactory(observationQueueLength));
     }
 
     @Override
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java	(revision 1758575)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/repository/RepositoryImpl.java	(working copy)
@@ -58,6 +58,7 @@
 import org.apache.jackrabbit.oak.jcr.session.SessionContext;
 import org.apache.jackrabbit.oak.jcr.session.SessionStats;
 import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
+import org.apache.jackrabbit.oak.spi.commit.EventQueueFactory;
 import org.apache.jackrabbit.oak.spi.gc.DelegatingGCMonitor;
 import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
@@ -123,6 +124,8 @@
 
     private final StatisticManager statisticManager;
 
+	private EventQueueFactory eventQueueFactory;
+
     /**
      * Constructor used for backward compatibility.
      */
@@ -130,9 +133,10 @@
                           @Nonnull Whiteboard whiteboard,
                           @Nonnull SecurityProvider securityProvider,
                           int observationQueueLength,
-                          CommitRateLimiter commitRateLimiter) {
+                          CommitRateLimiter commitRateLimiter,
+                          EventQueueFactory eventQueueFactory) {
         this(contentRepository, whiteboard, securityProvider, 
-                observationQueueLength, commitRateLimiter, false);
+                observationQueueLength, commitRateLimiter, false, eventQueueFactory);
     }
     
     public RepositoryImpl(@Nonnull ContentRepository contentRepository,
@@ -140,12 +144,14 @@
                           @Nonnull SecurityProvider securityProvider,
                           int observationQueueLength,
                           CommitRateLimiter commitRateLimiter,
-                          boolean fastQueryResultSize) {
+                          boolean fastQueryResultSize,
+                          EventQueueFactory eventQueueFactory) {
         this.contentRepository = checkNotNull(contentRepository);
         this.whiteboard = checkNotNull(whiteboard);
         this.securityProvider = checkNotNull(securityProvider);
         this.observationQueueLength = observationQueueLength;
         this.commitRateLimiter = commitRateLimiter;
+        this.eventQueueFactory = eventQueueFactory;
         this.descriptors = determineDescriptors();
         this.statisticManager = new StatisticManager(whiteboard, scheduledExecutor);
         this.clock = new Clock.Fast(scheduledExecutor);
@@ -282,7 +288,7 @@
             SessionContext context = createSessionContext(
                     statisticManager, securityProvider,
                     createAttributes(refreshInterval, relaxedLocking),
-                    sessionDelegate, observationQueueLength, commitRateLimiter);
+                    sessionDelegate, observationQueueLength, commitRateLimiter, eventQueueFactory);
             return context.getSession();
         } catch (LoginException e) {
             throw new javax.jcr.LoginException(e.getMessage(), e);
@@ -332,15 +338,16 @@
      * Factory method for creating a {@link SessionContext} instance for
      * a new session. Called by {@link #login()}. Can be overridden by
      * subclasses to customize the session implementation.
+     * @param eventQueueFactory 
      *
      * @return session context
      */
     protected SessionContext createSessionContext(
             StatisticManager statisticManager, SecurityProvider securityProvider,
             Map<String, Object> attributes, SessionDelegate delegate, int observationQueueLength,
-            CommitRateLimiter commitRateLimiter) {
+            CommitRateLimiter commitRateLimiter, EventQueueFactory eventQueueFactory) {
         return new SessionContext(this, statisticManager, securityProvider, whiteboard, attributes,
-                delegate, observationQueueLength, commitRateLimiter, fastQueryResultSize);
+                delegate, observationQueueLength, commitRateLimiter, fastQueryResultSize, eventQueueFactory);
     }
 
     /**
Index: oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java
===================================================================
--- oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java	(revision 1758575)
+++ oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/session/SessionContext.java	(working copy)
@@ -56,6 +56,7 @@
 import org.apache.jackrabbit.oak.plugins.nodetype.ReadOnlyNodeTypeManager;
 import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.plugins.value.ValueFactoryImpl;
+import org.apache.jackrabbit.oak.spi.commit.EventQueueFactory;
 import org.apache.jackrabbit.oak.spi.security.SecurityConfiguration;
 import org.apache.jackrabbit.oak.spi.security.SecurityProvider;
 import org.apache.jackrabbit.oak.spi.security.authorization.AuthorizationConfiguration;
@@ -64,9 +65,9 @@
 import org.apache.jackrabbit.oak.spi.security.user.UserConfiguration;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.spi.xml.ProtectedItemImporter;
-import org.apache.jackrabbit.oak.stats.StatisticManager;
 import org.apache.jackrabbit.oak.stats.CounterStats;
 import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticManager;
 import org.apache.jackrabbit.oak.stats.TimerStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,14 +112,16 @@
     
     private final boolean fastQueryResultSize;
 
+	private final EventQueueFactory eventQueueFactory;
+
     public SessionContext(
              @Nonnull Repository repository, @Nonnull StatisticManager statisticManager,
              @Nonnull SecurityProvider securityProvider, @Nonnull Whiteboard whiteboard,
              @Nonnull Map<String, Object> attributes, @Nonnull final SessionDelegate delegate,
-             int observationQueueLength, CommitRateLimiter commitRateLimiter) {
+             int observationQueueLength, CommitRateLimiter commitRateLimiter, EventQueueFactory eventQueueFactory) {
         
         this(repository, statisticManager, securityProvider, whiteboard, attributes, delegate,
-            observationQueueLength, commitRateLimiter, false);
+            observationQueueLength, commitRateLimiter, false, eventQueueFactory);
     }
 
     public SessionContext(
@@ -126,7 +129,8 @@
             @Nonnull SecurityProvider securityProvider, @Nonnull Whiteboard whiteboard,
             @Nonnull Map<String, Object> attributes, @Nonnull final SessionDelegate delegate,
             int observationQueueLength, CommitRateLimiter commitRateLimiter,
-            boolean fastQueryResultSize) {
+            boolean fastQueryResultSize,
+            EventQueueFactory eventQueueFactory) {
         this.repository = checkNotNull(repository);
         this.statisticManager = statisticManager;
         this.securityProvider = checkNotNull(securityProvider);
@@ -135,6 +139,7 @@
         this.delegate = checkNotNull(delegate);
         this.observationQueueLength = observationQueueLength;
         this.commitRateLimiter = commitRateLimiter;
+        this.eventQueueFactory = eventQueueFactory;
         SessionStats sessionStats = delegate.getSessionStats();
         sessionStats.setAttributes(attributes);
 
@@ -290,7 +295,7 @@
             observationManager = new ObservationManagerImpl(
                 this,
                 ReadOnlyNodeTypeManager.getInstance(delegate.getRoot(), namePathMapper),
-                whiteboard, observationQueueLength, commitRateLimiter);
+                whiteboard, observationQueueLength, commitRateLimiter, eventQueueFactory);
         }
         return observationManager;
     }
Index: oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
===================================================================
--- oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java	(revision 1757362)
+++ oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java	(working copy)
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.Collections.emptyMap;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -89,6 +90,8 @@
 import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.apache.jackrabbit.oak.spi.gc.GCMonitorTracker;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateSerializer;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.spi.state.NodeStoreProvider;
 import org.apache.jackrabbit.oak.spi.state.ProxyNodeStore;
@@ -119,7 +122,7 @@
                 "options are supported"
 )
 public class SegmentNodeStoreService extends ProxyNodeStore
-        implements Observable, SegmentStoreProvider {
+        implements Observable, SegmentStoreProvider, NodeStateSerializer {
 
     public static final String NAME = "name";
 
@@ -384,7 +387,10 @@
                 Dictionary<String, Object> props = new Hashtable<String, Object>();
                 props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
                 props.put("oak.nodestore.description", new String[]{"nodeStoreType=segment"});
-                storeRegistration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props);
+                storeRegistration = context.getBundleContext().registerService(new String[]{
+                        NodeStore.class.getName(), 
+                        NodeStateSerializer.class.getName()
+                   }, this, props);
             }
         }
     }
@@ -828,4 +834,17 @@
     public String toString() {
         return name + ": " + segmentNodeStore;
     }
+
+    @Override
+    public String serialize(NodeState nodeState) {
+    	checkArgument(nodeState instanceof SegmentNodeState);
+    	SegmentNodeState segmentNodeState = (SegmentNodeState) nodeState;
+    	return segmentNodeState.getRecordId().toString10();
+    }
+    
+    @Override
+    public NodeState deserialize(String serializedNodeStateStr) {
+    	return new SegmentNodeState(RecordId.fromString(store.getTracker(), serializedNodeStateStr));
+    }
+
 }
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java	(revision 1758607)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java	(working copy)
@@ -18,6 +18,7 @@
  */
 package org.apache.jackrabbit.oak.segment;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static java.util.Collections.emptyMap;
 import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean;
@@ -82,6 +83,8 @@
 import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
 import org.apache.jackrabbit.oak.spi.gc.GCMonitorTracker;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStateSerializer;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.spi.state.ProxyNodeStore;
 import org.apache.jackrabbit.oak.spi.state.RevisionGC;
@@ -111,7 +114,7 @@
                 "options are supported"
 )
 public class SegmentNodeStoreService extends ProxyNodeStore
-        implements Observable, SegmentStoreProvider {
+        implements Observable, SegmentStoreProvider, NodeStateSerializer {
 
     public static final String NAME = "name";
 
@@ -346,7 +349,10 @@
                 Dictionary<String, Object> props = new Hashtable<String, Object>();
                 props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
                 props.put("oak.nodestore.description", new String[]{"nodeStoreType=segment"});
-                storeRegistration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props);
+                storeRegistration = context.getBundleContext().registerService(new String[]{
+                        NodeStore.class.getName(), 
+                        NodeStateSerializer.class.getName()
+                   }, this, props);
             }
         }
     }
@@ -732,6 +738,21 @@
         return getNodeStore().addObserver(observer);
     }
 
+    //---------------------------------------------------< NodeStateSerializer >---
+
+    @Override
+    public String serialize(NodeState nodeState) {
+    	checkArgument(nodeState instanceof SegmentNodeState);
+    	SegmentNodeState segmentNodeState = (SegmentNodeState) nodeState;
+    	return segmentNodeState.getRecordId().toString10();
+    }
+    
+    @Override
+    public NodeState deserialize(String serializedNodeStateStr) {
+    	RecordId recordId = RecordId.fromString(store, serializedNodeStateStr);
+		return store.getReader().readNode(recordId);
+    }
+
     //------------------------------------------------------------< Object >--
 
     @Override
Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentDataStoreBlobGCIT.java
===================================================================
--- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentDataStoreBlobGCIT.java	(revision 1758607)
+++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentDataStoreBlobGCIT.java	(working copy)
@@ -471,7 +471,7 @@
             this.additionalBlobs = Sets.newHashSet();
         }
         
-        @Override
+/*        @Override
         protected void markAndSweep(boolean markOnly) throws Exception {
             boolean threw = true;
             GarbageCollectorFileState fs = new GarbageCollectorFileState(root);
@@ -505,7 +505,7 @@
                 }
             }
         }
-        
+*/        
         public HashSet<String> createAdditional() throws Exception {
             HashSet<String> blobSet = new HashSet<String>();
             NodeBuilder a = nodeStore.getRoot().builder();
Index: oak-segment-tar/pom.xml
===================================================================
--- oak-segment-tar/pom.xml	(revision 1758607)
+++ oak-segment-tar/pom.xml	(working copy)
@@ -34,7 +34,7 @@
     <name>Oak Segment Tar</name>
 
     <properties>
-        <oak.version>1.5.5</oak.version>
+        <oak.version>1.6-SNAPSHOT</oak.version>
         <netty.version>4.0.23.Final</netty.version>
     </properties>
 
