diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
index 4dffa92..aa4ffa8 100644
--- oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
+++ oak-core/src/main/java/org/apache/jackrabbit/oak/spi/commit/BackgroundObserver.java
@@ -202,6 +202,10 @@ public class BackgroundObserver implements Observer, Closeable {
         stopped = true;
     }
 
+    public int getQueueSize(){
+        return queue.size();
+    }
+
     //----------------------------------------------------------< Observer >--
 
     /**
diff --git oak-parent/pom.xml oak-parent/pom.xml
index 2216499..4ffd048 100644
--- oak-parent/pom.xml
+++ oak-parent/pom.xml
@@ -58,7 +58,7 @@
     <slf4j.api.version>1.7.6</slf4j.api.version>
     <slf4j.version>1.7.6</slf4j.version> <!-- sync with logback version -->
     <logback.version>1.1.0</logback.version>
-    <h2.version>1.4.182</h2.version>
+    <h2.version>1.4.185</h2.version>
     <findbugs.version>3.0.0</findbugs.version>
 
    <!-- specifies on which fixture to run the integration testing tests. 
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
index 78efc75..4f32dd8 100644
--- oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
@@ -210,6 +210,7 @@ public class BenchmarkRunner {
             new CreateNodesBenchmark(),
             new ManyNodes(),
             new ObservationTest(),
+            new ObservationTest2(),
             new XmlImportTest(),
             new FlatTreeWithAceForSamePrincipalTest(),
             new ReadDeepTreeTest(
diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest2.java oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest2.java
new file mode 100644
index 0000000..66799fd
--- /dev/null
+++ oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ObservationTest2.java
@@ -0,0 +1,843 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.benchmark;
+
+import static javax.jcr.observation.Event.NODE_ADDED;
+import static javax.jcr.observation.Event.NODE_MOVED;
+import static javax.jcr.observation.Event.NODE_REMOVED;
+import static javax.jcr.observation.Event.PERSIST;
+import static javax.jcr.observation.Event.PROPERTY_ADDED;
+import static javax.jcr.observation.Event.PROPERTY_CHANGED;
+import static javax.jcr.observation.Event.PROPERTY_REMOVED;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jcr.Node;
+import javax.jcr.PathNotFoundException;
+import javax.jcr.Property;
+import javax.jcr.Repository;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
+import javax.jcr.ValueFormatException;
+import javax.jcr.lock.LockException;
+import javax.jcr.nodetype.ConstraintViolationException;
+import javax.jcr.observation.Event;
+import javax.jcr.observation.EventIterator;
+import javax.jcr.observation.EventListener;
+import javax.jcr.observation.ObservationManager;
+import javax.jcr.version.VersionException;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.Query;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closer;
+import com.google.common.primitives.Longs;
+import org.apache.jackrabbit.JcrConstants;
+import org.apache.jackrabbit.api.observation.JackrabbitEvent;
+import org.apache.jackrabbit.commons.JcrUtils;
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.api.jmx.RepositoryManagementMBean;
+import org.apache.jackrabbit.oak.api.jmx.RepositoryStatsMBean;
+import org.apache.jackrabbit.oak.fixture.JcrCreator;
+import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture;
+import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
+import org.apache.jackrabbit.oak.jcr.Jcr;
+import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ObservationTest2 extends Benchmark {
+    public static final int EVENT_TYPES = NODE_ADDED | NODE_REMOVED | NODE_MOVED |
+            PROPERTY_ADDED | PROPERTY_REMOVED | PROPERTY_CHANGED | PERSIST;
+    private static final int EVENTS_PER_NODE = 2; // NODE_ADDED and PROPERTY_ADDED
+    private static final int SAVE_INTERVAL = Integer.getInteger("saveInterval", 100);
+    private static final int OUTPUT_RESOLUTION = 100;
+    private static final int LISTENER_COUNT = Integer.getInteger("listenerCount", 200);
+    public static final String OAK_UNSTRUCTURED = "oak:Unstructured";
+    public static final String TYPE = "type";
+    public static final String TYPE_ASSET = "asset";
+    private final ExecutorService executorService = Executors.newFixedThreadPool(4);
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private static final Clock CLOCK = new Clock.Virtual();
+
+    @Override
+    public void run(Iterable<RepositoryFixture> fixtures) {
+        for (RepositoryFixture fixture : fixtures) {
+            if (fixture.isAvailable(1)) {
+                final AtomicReference<Whiteboard> whiteboardRef = new AtomicReference<Whiteboard>();
+                System.out.format("%s: Observation throughput benchmark%n", fixture);
+                try {
+                    Repository[] cluster;
+                    if (fixture instanceof OakRepositoryFixture){
+                        cluster = ((OakRepositoryFixture) fixture).setUpCluster(1, new JcrCreator() {
+                            @Override
+                            public Jcr customize(Oak oak) {
+                                whiteboardRef.set(oak.getWhiteboard());
+                                return new Jcr(oak);
+                            }
+                        });
+                    } else {
+                        cluster = fixture.setUpCluster(1);
+                    }
+                    try {
+                        run(cluster[0], whiteboardRef.get(), "server0");
+                    } finally {
+                        fixture.tearDownCluster();
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        executorService.shutdownNow();
+    }
+
+    private void run(Repository repository, Whiteboard whiteboard, String clusterId) throws RepositoryException,
+            ExecutionException, InterruptedException, IOException {
+        Session session = createSession(repository);
+        long t0 = System.currentTimeMillis();
+        try {
+            observationThroughput(repository, whiteboard, clusterId);
+        } finally {
+            System.out.println("Time elapsed: " + (System.currentTimeMillis() - t0) + " ms");
+            session.logout();
+        }
+    }
+
+    public void observationThroughput(final Repository repository, Whiteboard whiteboard, final String clusterId)
+            throws RepositoryException, InterruptedException, ExecutionException, IOException {
+        Stopwatch watch = Stopwatch.createStarted();
+        long t = 0;
+        final int assetsPerLevel = 100;
+        final int noOfLevels = 10;
+        final int noOfAssets = assetsPerLevel * noOfLevels;
+        final AtomicInteger localEventCount = new AtomicInteger();
+        final AtomicInteger externalEventCount = new AtomicInteger();
+        final AtomicInteger nodeCount = new AtomicInteger();
+        final AtomicInteger assetCount = new AtomicInteger();
+        Closer closer = Closer.create();
+
+        final Session[] sessions = new Session[LISTENER_COUNT];
+        EventListener[] listeners = new Listener[LISTENER_COUNT];
+
+        try {
+            for (int k = 0; k < LISTENER_COUNT; k++) {
+                sessions[k] = createSession(repository);
+                listeners[k] = new Listener(localEventCount, externalEventCount);
+                ObservationManager obsMgr = sessions[k].getWorkspace().getObservationManager();
+                obsMgr.addEventListener(listeners[k], EVENT_TYPES, PATHS[k], true, null, null, false);
+            }
+
+            BlockingQueue<String> jobs = new LinkedBlockingDeque<String>();
+            JobManager jobManager = new JobManager(repository, jobs, executorService, clusterId);
+            Future<Integer> submittedJobs = executorService.submit(jobManager);
+
+            WorkflowListener wfListener = new WorkflowListener(createSession(repository), jobs, assetCount);
+
+            closer.register(new AuditListener(createSession(repository)));
+
+            Future<?> createNodes = executorService.submit(new Runnable() {
+                private final Session session = createSession(repository);
+
+                @Override
+                public void run() {
+                    try {
+                        bootstrap(session, clusterId);
+                        Node testRoot = session.getRootNode().getNode("content/dam").getNode(clusterId);
+                        createChildren(testRoot, noOfLevels);
+                        for (Node m : JcrUtils.getChildNodes(testRoot)) {
+                            createAssets(session, m.getPath(), assetsPerLevel, assetCount);
+                        }
+                        session.save();
+                    } catch (RepositoryException e) {
+                        throw new RuntimeException(e);
+                    } finally {
+                        session.logout();
+                    }
+                }
+
+                private void createChildren(Node node, int count) throws RepositoryException {
+                    for (int c = 0; c < count; c++) {
+                        node.addNode("n" + c);
+                        if (nodeCount.incrementAndGet() % SAVE_INTERVAL == 0) {
+                            node.getSession().save();
+                        }
+                    }
+                }
+            });
+
+            System.out.println("ms      #node   nodes/s #event  #external event/s event ratio");
+            while (!createNodes.isDone() || (assetCount.get() < noOfAssets)) {
+                long t0 = System.currentTimeMillis();
+                Thread.sleep(OUTPUT_RESOLUTION);
+                t += System.currentTimeMillis() - t0;
+
+                int nc = nodeCount.get();
+                int ec = localEventCount.get() / LISTENER_COUNT;
+
+                double nps = (double) nc / t * 1000;
+                double eps = (double) ec / t * 1000;
+                double epn = (double) ec / nc / EVENTS_PER_NODE;
+
+                System.out.format("%7d %7d %7.1f %7d %7d %7.1f %1.2f %s%n", t, nc, nps, ec,
+                        externalEventCount.get(), eps, epn, getPerSecondStats(whiteboard));
+            }
+            createNodes.get();
+            log.info("All main assets created");
+
+            //Wait for workflows to finish
+            wfListener.setExpectedAssetCount(assetCount.get());
+            wfListener.close();
+            log.info("All workflows triggered");
+
+            jobManager.initiateShutdown();
+            long submittedJobsCount = submittedJobs.get();
+            log.info("All required jobs scheduled {}",submittedJobsCount);
+
+            jobManager.awaitCompletion();
+            log.info("All required jobs processed");
+
+            waitForListenerQueuesToFinish(whiteboard);
+            log.info("All queues empty now. Job done!!");
+
+            System.out.printf("Expected assets %d, created assets %d, jobs submitted %d, time taken %s %n", noOfAssets, assetCount.get(), submittedJobsCount, watch);
+
+            closer.close();
+        } finally {
+            for (int k = 0; k < LISTENER_COUNT; k++) {
+                sessions[k].getWorkspace().getObservationManager().removeEventListener(listeners[k]);
+                sessions[k].logout();
+            }
+        }
+    }
+
+    private void waitForListenerQueuesToFinish(Whiteboard wb) throws InterruptedException {
+        List<Observer> observers = WhiteboardUtils.getServices(wb, Observer.class);
+        int queueSize;
+        while ((queueSize = getMaxQueueSize(observers)) > 0){
+            TimeUnit.SECONDS.sleep(1);
+            log.info("Queue size {}", queueSize);
+        }
+        return;
+    }
+
+    private int getMaxQueueSize(List<Observer> observers) {
+        List<Integer> queueStats = getQueueStats(observers);
+        Collections.sort(queueStats, Collections.reverseOrder());
+        return queueStats.size() > 0 ? queueStats.get(0) : 0;
+    }
+
+    private List<Integer> getQueueStats(List<Observer> observers) {
+        List<Integer> stats = Lists.newArrayList();
+        for (Observer o : observers){
+            if (o instanceof BackgroundObserver){
+                int size = ((BackgroundObserver) o).getQueueSize();
+                if (size > 0) {
+                    stats.add(size);
+                }
+            }
+        }
+        return stats;
+    }
+
+    private void bootstrap(Session session, String clusterId) throws RepositoryException {
+        JcrUtils.getOrCreateByPath("/content/dam/"+clusterId, OAK_UNSTRUCTURED, OAK_UNSTRUCTURED, session, true);
+        JcrUtils.getOrCreateByPath("/var/audit/dam", OAK_UNSTRUCTURED, OAK_UNSTRUCTURED, session, true);
+        JcrUtils.getOrCreateByPath("/etc/workflow/instances", OAK_UNSTRUCTURED, OAK_UNSTRUCTURED, session, true);
+        JcrUtils.getOrCreateByPath("/var/eventing/jobs/assigned", OAK_UNSTRUCTURED, OAK_UNSTRUCTURED, session, true);
+    }
+
+    private String getPerSecondStats(Whiteboard wb) {
+        if (wb == null) {
+            return "<unknonwn>";
+        }
+        RepositoryStatsMBean mbean = WhiteboardUtils.getService(wb, RepositoryStatsMBean.class);
+        long[] perSecond = (long[]) mbean.getObservationQueueMaxLength().get("per second");
+        List<Long> data = Longs.asList(perSecond);
+        Collections.reverse(data);
+        return data.toString();
+    }
+
+    private void createAssets(Session session, String basePath, int count, AtomicInteger assetCount) throws RepositoryException {
+        for (int i = 0 ; i < count; i ++){
+            createAsset(session, basePath + "/test"+i+".png");
+            assetCount.incrementAndGet();
+        }
+    }
+
+    private String createAsset(Session session, String path) throws RepositoryException {
+        Node assetNode = JcrUtils.getOrCreateByPath(path, OAK_UNSTRUCTURED, session);
+        assetNode.setProperty(TYPE, TYPE_ASSET);
+        Node content = assetNode.addNode(JcrConstants.JCR_CONTENT);
+        content.addNode("metadata");
+        content.addNode("related");
+        Node renditions = content.addNode("renditions");
+        Node original = renditions.addNode("original");
+        Node originalContent = original.addNode(JcrConstants.JCR_CONTENT);
+        originalContent.setProperty(JcrConstants.JCR_LASTMODIFIED, Calendar.getInstance());
+        session.save();
+        return assetNode.getPath();
+    }
+
+    private static Session createSession(Repository repository)
+            throws RepositoryException {
+        return repository.login(new SimpleCredentials("admin", "admin".toCharArray()));
+    }
+
+    private static class Listener implements EventListener {
+        private final AtomicInteger localEventCount;
+        private final AtomicInteger externalEventCount;
+
+        public Listener(AtomicInteger localEventCount, AtomicInteger externalEventCount) {
+            this.localEventCount = localEventCount;
+            this.externalEventCount = externalEventCount;
+        }
+
+        @Override
+        public void onEvent(EventIterator events) {
+            while(events.hasNext()){
+                JackrabbitEvent event = (JackrabbitEvent) events.nextEvent();
+                if(event.isExternal()){
+                    externalEventCount.incrementAndGet();
+                } else {
+                    localEventCount.incrementAndGet();
+                }
+            }
+        }
+    }
+
+    private static class AuditListener implements EventListener, Closeable {
+        private final Logger log = LoggerFactory.getLogger(getClass());
+        private final Session session;
+
+        public AuditListener(Session session) throws RepositoryException {
+            this.session = session;
+            session.getWorkspace().getObservationManager()
+                    .addEventListener(this, EVENT_TYPES, "/content", true, null, null, true);
+        }
+
+        @Override
+        public void onEvent(EventIterator events) {
+           try{
+               onEvent0(events);
+           }catch (RepositoryException e){
+               log.warn("Error occurred", e);
+           }
+        }
+
+        private void onEvent0(EventIterator events) throws RepositoryException {
+            while(events.hasNext()){
+                Event e = events.nextEvent();
+                if (e.getType() == Event.PROPERTY_ADDED){
+                    Property type = session.getProperty(e.getPath());
+                    if (TYPE_ASSET.equals(type.getString())){
+                        Node addedNode = type.getParent();
+                        String auditNodePath = "/var/audit/dam" + addedNode.getPath() + "/" + UUID.randomUUID().toString();
+                        Node auditNode = JcrUtils.getOrCreateByPath(auditNodePath, OAK_UNSTRUCTURED, OAK_UNSTRUCTURED, session, false);
+                        auditNode.setProperty("time", Calendar.getInstance());
+                        auditNode.setProperty("userid", "admin");
+                        auditNode.setProperty("path", addedNode.getPath());
+                        auditNode.setProperty("category", "dam");
+                        session.save();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            try {
+                session.getWorkspace().getObservationManager().removeEventListener(this);
+            } catch (RepositoryException e) {
+                throw new RuntimeException(e);
+            }
+            session.logout();
+        }
+    }
+
+    private static class WorkflowListener implements EventListener, Closeable {
+        private final Logger log = LoggerFactory.getLogger(getClass());
+        private final Session session;
+        private final AtomicInteger count = new AtomicInteger();
+        private final BlockingQueue<String> instanceQueue;
+        private int expectedAssetCount = -1;
+        private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+        public WorkflowListener(Session session, BlockingQueue<String> instanceQueue,
+                                AtomicInteger assetCount) throws RepositoryException {
+            this.session = session;
+            session.getWorkspace().getObservationManager()
+                    .addEventListener(this, EVENT_TYPES, "/content", true, null, null, true);
+            this.instanceQueue = instanceQueue;
+        }
+
+        @Override
+        public void onEvent(EventIterator events) {
+            try{
+                onEvent0(events);
+            }catch (RepositoryException e){
+                log.warn("Error occurred", e);
+            }
+        }
+
+        public void setExpectedAssetCount(int expectedAssetCount) {
+            this.expectedAssetCount = expectedAssetCount;
+        }
+
+        private void onEvent0(EventIterator events) throws RepositoryException {
+            while(events.hasNext()){
+                JackrabbitEvent e = (JackrabbitEvent) events.nextEvent();
+                if (e.getType() == Event.PROPERTY_ADDED){
+                    Property type = session.getProperty(e.getPath());
+                    if (TYPE_ASSET.equals(type.getString())){
+                        int id = count.incrementAndGet();
+                        if (e.isExternal()){
+                            if (id == expectedAssetCount){
+                                shutdownLatch.countDown();
+                            }
+                            log.warn("External event found");
+                            continue;
+                        }
+
+                        Node addedNode = type.getParent();
+                        String wfInstanceNodePath = "/etc/workflow/instances/model_" + System.currentTimeMillis() + "/" + id;
+                        Node wfNode = JcrUtils.getOrCreateByPath(wfInstanceNodePath, OAK_UNSTRUCTURED, OAK_UNSTRUCTURED, session, false);
+                        wfNode.setProperty("modelId", id);
+                        wfNode.setProperty("startTime", Calendar.getInstance());
+                        wfNode.setProperty("initiator", "admin");
+                        wfNode.addNode("metaData");
+                        wfNode.addNode("workItems");
+                        Node data = wfNode.addNode("data");
+
+                        Node payload = data.addNode("payload");
+                        Node metadata = data.addNode("metadata");
+                        payload.setProperty("path", addedNode.getPath());
+                        metadata.setProperty("userid", "admin");
+                        session.save();
+
+                        Node history  = wfNode.addNode("history");
+                        addHistory(history);
+
+                        metadata.setProperty("currentJobs", "foo");
+                        metadata.setProperty("userId", "admin");
+                        metadata.setProperty("launcherId", "foo");
+
+                        session.save();
+
+                        instanceQueue.add(wfInstanceNodePath);
+
+                        if (id == expectedAssetCount){
+                            shutdownLatch.countDown();
+                        }
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void close() throws IOException {
+            try{
+                shutdownLatch.await();
+            } catch (InterruptedException ignore) {
+                log.warn("WorkflowListener interrupted");
+            }
+
+            try {
+                session.getWorkspace().getObservationManager().removeEventListener(this);
+            } catch (RepositoryException e) {
+                throw new RuntimeException(e);
+            }
+            session.logout();
+            log.info("Workflow listener shutdown completed. Processed {} assets", expectedAssetCount);
+        }
+    }
+
+    private static void addHistory(Node history) throws RepositoryException {
+        Node time  = history.addNode(""+CLOCK.getTime());
+        Node workItem  = time.addNode("workItem");
+        Node metadata2  = workItem.addNode("metadata");
+        metadata2.setProperty("archived", true);
+
+        workItem.setProperty("nodeId", 1);
+        workItem.setProperty("assigne", "admin");
+        workItem.setProperty("startTime", Calendar.getInstance());
+
+        time.setProperty("date", Calendar.getInstance());
+        time.setProperty("message", "foo");
+        time.setProperty("user", "admin");
+    }
+
+    private static class JobManager implements Callable<Integer>{
+        public static final String POISON_PILL = "--TERMINATE--";
+        private final AtomicInteger count = new AtomicInteger();
+        private final AtomicInteger processedCount = new AtomicInteger();
+        private final BlockingQueue<String> instanceQueue;
+        private final Session session;
+        private final ExecutorService executorService;
+        private final Repository repository;
+        private final String clusterId;
+        private final CountDownLatch completed = new CountDownLatch(1);
+        private volatile boolean allJobsSubmitted;
+
+        public JobManager(Repository repository,
+                           BlockingQueue<String> instanceQueue,
+                           ExecutorService executorService,
+                           String clusterId
+        ) throws RepositoryException {
+            this.instanceQueue = instanceQueue;
+            this.session = createSession(repository);
+            this.executorService = executorService;
+            this.repository = repository;
+            this.clusterId = clusterId;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            String path;
+            while((path = instanceQueue.take()) != POISON_PILL){
+                int id = count.incrementAndGet();
+                final String jobPath = createJob(path, id);
+
+                executorService.submit(new Callable<Object>() {
+                    @Override
+                    public Object call() throws Exception {
+                        return processJob(jobPath);
+                    }
+                });
+            }
+            session.logout();
+            allJobsSubmitted = true;
+            return count.get();
+        }
+
+        public void awaitCompletion() throws InterruptedException {
+            completed.await();
+        }
+
+        public void initiateShutdown() {
+            instanceQueue.add(POISON_PILL);
+        }
+
+        private Object processJob(String jobPath) throws RepositoryException {
+            Session session = createSession(repository);
+            try{
+                Node jobNode = session.getNode(jobPath);
+                String wfPath = jobNode.getProperty("path").getString();
+                processWorkflow(wfPath, session);
+                jobNode.remove();
+                session.save();
+                processedCount.incrementAndGet();
+            }finally {
+                session.logout();
+            }
+
+            if (allJobsSubmitted && processedCount.get() >= count.get()){
+                completed.countDown();
+            }
+            return null;
+        }
+
+        private void processWorkflow(String wfPath, Session session) throws RepositoryException {
+            Node wfNode = session.getNode(wfPath);
+            Node history = wfNode.getNode("history");
+            addHistory(history);
+            session.save();
+
+            String assetPath = wfNode.getProperty("data/payload/path").getString();
+            Node assetNode = session.getNode(assetPath);
+            Node metadata = assetNode.getNode("jcr:content/metadata");
+            setProps(metadata, 15);
+            session.save();
+
+            for (int i = 0; i < 20; i++) {
+                addHistory(history);
+            }
+        }
+
+        private static void setProps(Node n, int count) throws RepositoryException {
+            for (int i = 0; i < count; i++) {
+                n.setProperty("a"+i, i);
+            }
+        }
+
+        private String createJob(String path, int id) throws RepositoryException {
+            String jobPath = String.format("/var/eventing/jobs/assigned/%s/model/2015/3/23/15/11/%s_%d", clusterId, clusterId, id);
+            Node jobNode = JcrUtils.getOrCreateByPath(jobPath, OAK_UNSTRUCTURED, OAK_UNSTRUCTURED, session, true);
+            jobNode.setProperty("a", "b");
+            jobNode.setProperty("path", path);
+            session.save();
+            return jobNode.getPath();
+
+        }
+    }
+
+
+    private static final String[] PATHS = {
+            "/etc/cloudservices/audiencemanager",
+            "/content/campaigns",
+            "/content/dam",
+            "/content/publications",
+            "/etc/cloudservices",
+            "/etc/cloudservices",
+            "/etc/cloudservices/twitterconnect",
+            "/etc/cloudservices/facebookconnect",
+            "/var/community/create",
+            "/home/users",
+            "/home/users",
+            "/etc",
+            "/libs",
+            "/apps",
+            "/etc/workflow/packages",
+            "/etc/workflow",
+            "/var/mailimport",
+            "/content/dam",
+            "/content/usergenerated",
+            "/var/statistics/tracking/dummy",
+            "/etc/workflow/launcher/config",
+            "/home/users",
+            "/content/forms/fp",
+            "/etc/social/groups",
+            "/etc/workflow/launcher/config",
+            "/etc/workflow/models",
+            "/content/dam",
+            "/libs",
+            "/apps",
+            "/content/dam",
+            "/content/dam",
+            "/etc/cloudservices/proxy/ids",
+            "/content/dam",
+            "/etc/cloudservices/dynamicmediaservices",
+            "/content/campaigns",
+            "/etc",
+            "/content",
+            "/etc",
+            "/content",
+            "/etc/replication",
+            "/var/linkchecker",
+            "/libs",
+            "/apps",
+            "/libs",
+            "/apps",
+            "/content",
+            "/etc/designs",
+            "/libs/cq/linkchecker",
+            "/libs",
+            "/apps",
+            "/etc",
+            "/etc",
+            "/content",
+            "/content/campaigns",
+            "/content/campaigns",
+            "/content",
+            "/etc/blueprints",
+            "/content",
+            "/content",
+            "/etc/msm/rolloutconfigs",
+            "/content",
+            "/content/usergenerated/content",
+            "/content/usergenerated/content",
+            "/content/usergenerated/content",
+            "/content/usergenerated/content",
+            "/content/usergenerated/content",
+            "/content/usergenerated/content",
+            "/content",
+            "/content/usergenerated/content",
+            "/content/usergenerated/content",
+            "/apps",
+            "/libs",
+            "/libs/social/storage/install",
+            "/libs/cq/searchpromote/config",
+            "/libs/fd/fm/install",
+            "/libs/cq/address/install",
+            "/libs/wcm/notification/config",
+            "/libs/granite/monitoring/config",
+            "/libs/social/commons/install",
+            "/libs/social/filelibrary/install",
+            "/libs/cq/linkchecker/config.author",
+            "/libs/granite/contexthub/install",
+            "/libs/dam/config",
+            "/libs/cq/chart/install",
+            "/libs/cq/activitystreams/install",
+            "/libs/social/tally/install",
+            "/libs/mcm/campaign/install",
+            "/libs/social/scoring/install",
+            "/libs/cq/cloudservices/install",
+            "/libs/cq/analytics/config",
+            "/libs/cq/experiencelog/install",
+            "/libs/social/messaging/install",
+            "/libs/cq/commons/install",
+            "/libs/cq/replication/config",
+            "/libs/cq/statistics/config",
+            "/libs/dam/install",
+            "/libs/wcm/workflow/install",
+            "/libs/wcm/emulator/install",
+            "/libs/cq/core/config.author",
+            "/libs/fd/af/install",
+            "/libs/wcm/translation/install",
+            "/libs/granite/workflow/config",
+            "/libs/social/console/config",
+            "/libs/sightly/js/config",
+            "/libs/cq/tagmanager/install",
+            "/libs/cq/dashboards/install",
+            "/libs/media/publishing/install",
+            "/libs/social/console/install",
+            "/libs/social/qna/install",
+            "/libs/wcm/msm/config",
+            "/libs/cq/securityhc/install",
+            "/libs/cq/tagging/install",
+            "/libs/cq/security/install",
+            "/libs/foundation/install",
+            "/libs/cq/healthcheck/install",
+            "/libs/cq/dtm/config",
+            "/libs/wcm/core/install",
+            "/libs/wcm/mobile/config.author",
+            "/libs/cq/security/config.author",
+            "/libs/sling/config",
+            "/libs/commerce/install",
+            "/apps/geometrixx/install",
+            "/libs/social/socialgraph/install",
+            "/libs/cq/targetrecommendations/config",
+            "/libs/wcm/contentsync/install",
+            "/apps/social/twitterprovider/config",
+            "/libs/cq/platform/install",
+            "/libs/wcm/taglib/install",
+            "/libs/social/cloud/install",
+            "/libs/granite/operations/config",
+            "/libs/cq/analytics/install",
+            "/libs/wcm/scripting/install",
+            "/libs/social/sync/install",
+            "/libs/wcm/msm/install",
+            "/apps/social/facebookprovider/config",
+            "/libs/social/ugcbase/install",
+            "/libs/mobileapps/admin/config",
+            "/libs/mcm/exacttarget/install",
+            "/libs/cq/searchpromote/install",
+            "/libs/granite/offloading/config.author",
+            "/libs/social/blog/install",
+            "/libs/cq/security/config",
+            "/libs/media/articles/install",
+            "/libs/fd/fm/config",
+            "/libs/wcm/designimporter/install",
+            "/libs/dam/config.author",
+            "/libs/xfaforms/install",
+            "/libs/cq/healthcheck/config",
+            "/libs/social/forum/install",
+            "/apps/geometrixx-media/config",
+            "/libs/mcm/config",
+            "/libs/cq/apple/install",
+            "/apps/geometrixx/config",
+            "/libs/social/moderation/install",
+            "/libs/fd/fp/install",
+            "/libs/wcm/core/config",
+            "/libs/system/config",
+            "/libs/social/calendar/install",
+            "/libs/wcm/mobile/install",
+            "/libs/wcm/msm/config.author",
+            "/libs/wcm/contentsync/config",
+            "/libs/wcm/sightly/install",
+            "/libs/mcm/salesforce/install",
+            "/libs/social/activitystreams/config",
+            "/libs/wcm/webservice-support/install",
+            "/libs/wcm/undo/install",
+            "/libs/cq/history/config.author",
+            "/libs/social/connect/install",
+            "/libs/cq/linkchecker/config",
+            "/libs/cq/compat/install",
+            "/libs/wcm/resource-details/install",
+            "/libs/social/journal/install",
+            "/libs/system/config.author",
+            "/libs/cq/commons/config.author",
+            "/libs/commerce/config",
+            "/libs/granite/translation/install",
+            "/libs/social/config",
+            "/libs/cq/tagmanager/config",
+            "/apps/geometrixx-commons/install",
+            "/libs/social/group/install",
+            "/apps/geometrixx-gov/config",
+            "/libs/social/reviews/install",
+            "/libs/cq/reporting/config",
+            "/libs/social/activitystreams/install",
+            "/libs/cq/config",
+            "/libs/social/translation/install",
+            "/libs/granite/offloading/config",
+            "/libs/wcm/notification/install",
+            "/libs/cq/searchpromote/config.author",
+            "/libs/launches/install",
+            "/apps/geometrixx-commons/config",
+            "/libs/cc/install",
+            "/libs/cq/replication/config.author",
+            "/libs/cq/ui/install",
+            "/libs/granite/distribution/install",
+            "/libs/wcm/launches/install",
+            "/libs/sling/install",
+            "/libs/mcm/install",
+            "/libs/cq/contentinsight/install",
+            "/libs/wcm/offline/install",
+            "/libs/wcm/webservice-support/config",
+            "/libs/social/ugc/install",
+            "/libs/wcm/core/config.author",
+            "/libs/fd/af/config",
+            "/libs/cq/projects/install",
+            "/libs/fd/thirdparty/install",
+            "/libs/granite/ocs/config.author",
+            "/libs/cq/rest/install",
+            "/libs/mcm/silverpop/install",
+            "/libs/cq/search/config",
+            "/libs/connector/install",
+            "/apps/social/messaging/config",
+            "/libs/cq/workflow/config",
+            "/libs/mobileapps/install",
+            "/libs/wcm/siteimporter/install",
+            "/apps/geometrixx-outdoors-app/config",
+            "/libs/cq/core/config",
+            "/libs/cq/personalization/config",
+            "/libs/social/sync/install.author",
+            "/apps/geometrixx-outdoors/config",
+    };
+
+}
