Index: oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java =================================================================== --- oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (revision 1801818) +++ oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (working copy) @@ -43,6 +43,7 @@ import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; @@ -123,6 +124,10 @@ } SegmentNodeStore(SegmentStore store, boolean internal) { + this(store, internal, true); + } + + SegmentNodeStore(SegmentStore store, boolean internal, boolean dispatchChanges) { if (commitFairLock) { log.info("initializing SegmentNodeStore with the commitFairLock option enabled."); } @@ -129,7 +134,11 @@ this.commitSemaphore = new Semaphore(1, commitFairLock); this.store = store; this.head = new AtomicReference(store.getHead()); - this.changeDispatcher = new ChangeDispatcher(getRoot()); + if (dispatchChanges) { + this.changeDispatcher = new ChangeDispatcher(getRoot()); + } else { + this.changeDispatcher = null; + } } public SegmentNodeStore() throws IOException { @@ -189,12 +198,24 @@ SegmentNodeState state = store.getHead(); if (!state.getRecordId().equals(head.get().getRecordId())) { head.set(state); - changeDispatcher.contentChanged(state.getChildNode(ROOT), null); + contentChanged(state.getChildNode(ROOT), null); } } + private static final Closeable NOOP = new Closeable() { + + @Override + public void close() throws IOException { + // Do nothing. + } + + }; + @Override public Closeable addObserver(Observer observer) { + if (changeDispatcher == null) { + return NOOP; + } return changeDispatcher.addObserver(observer); } @@ -479,7 +500,7 @@ refreshHead(); if (store.setHead(before, after)) { head.set(after); - changeDispatcher.contentChanged(after.getChildNode(ROOT), info); + contentChanged(after.getChildNode(ROOT), info); refreshHead(); return true; } else { @@ -595,4 +616,11 @@ void setCheckpointsLockWaitTime(int checkpointsLockWaitTime) { this.checkpointsLockWaitTime = checkpointsLockWaitTime; } + + void contentChanged(NodeState root, CommitInfo info) { + if (changeDispatcher != null) { + changeDispatcher.contentChanged(root, info); + } + } + } Index: oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBuilder.java =================================================================== --- oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBuilder.java (revision 1801818) +++ oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBuilder.java (working copy) @@ -34,6 +34,8 @@ private boolean isCreated; + private boolean dispatchChanges = true; + private CompactionStrategy compactionStrategy = NO_COMPACTION; private volatile SegmentNodeStore segmentNodeStore; @@ -97,6 +99,11 @@ return this; } + public SegmentNodeStoreBuilder withDispatchChanges(boolean dispatchChanges) { + this.dispatchChanges = dispatchChanges; + return this; + } + public CompactionStrategy getCompactionStrategy() { checkState(isCreated); return compactionStrategy; @@ -106,7 +113,7 @@ public SegmentNodeStore create() { checkState(!isCreated); isCreated = true; - segmentNodeStore = new SegmentNodeStore(store, true); + segmentNodeStore = new SegmentNodeStore(store, true, dispatchChanges); return segmentNodeStore; } Index: oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java =================================================================== --- oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (revision 1801818) +++ oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (working copy) @@ -342,18 +342,16 @@ } private synchronized void registerNodeStore() throws IOException { - if (registerSegmentStore()) { - if (toBoolean(property(STANDBY), false)) { - return; - } - - if (registerSegmentNodeStore()) { - Dictionary props = new Hashtable(); - props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName()); - props.put("oak.nodestore.description", new String[]{"nodeStoreType=segment"}); - storeRegistration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props); - } + if (!registerSegmentStore()) { + return; } + if (toBoolean(property(STANDBY), false)) { + return; + } + Dictionary props = new Hashtable(); + props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName()); + props.put("oak.nodestore.description", new String[] {"nodeStoreType=segment"}); + storeRegistration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props); } private boolean registerSegmentStore() throws IOException { @@ -478,60 +476,16 @@ scheduleWithFixedDelay(whiteboard, fsgcm, 1) ); - // Register a factory service to expose the FileStore - - providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, null); - - return true; - } - - private CompactionStrategy newCompactionStrategy() { - boolean pauseCompaction = toBoolean(property(PAUSE_COMPACTION), PAUSE_DEFAULT); - boolean cloneBinaries = toBoolean(property(COMPACTION_CLONE_BINARIES), CLONE_BINARIES_DEFAULT); - long cleanupTs = toLong(property(COMPACTION_CLEANUP_TIMESTAMP), TIMESTAMP_DEFAULT); - int retryCount = toInteger(property(COMPACTION_RETRY_COUNT), RETRY_COUNT_DEFAULT); - boolean forceAfterFail = toBoolean(property(COMPACTION_FORCE_AFTER_FAIL), FORCE_AFTER_FAIL_DEFAULT); - final int lockWaitTime = toInteger(property(COMPACTION_LOCK_WAIT_TIME), COMPACTION_LOCK_WAIT_TIME_DEFAULT); - boolean persistCompactionMap = toBoolean(property(PERSIST_COMPACTION_MAP), PERSIST_COMPACTION_MAP_DEFAULT); - - CleanupType cleanupType = getCleanUpType(); - byte memoryThreshold = getMemoryThreshold(); - byte gainThreshold = getGainThreshold(); - - // This is indeed a dirty hack, but it's needed to break a circular - // dependency between different components. The FileStore needs the - // CompactionStrategy, the CompactionStrategy needs the - // SegmentNodeStore, and the SegmentNodeStore needs the FileStore. - - CompactionStrategy compactionStrategy = new CompactionStrategy(pauseCompaction, cloneBinaries, cleanupType, cleanupTs, memoryThreshold) { - - @Override - public boolean compacted(Callable setHead) throws Exception { - // Need to guard against concurrent commits to avoid - // mixed segments. See OAK-2192. - return segmentNodeStore.locked(setHead, lockWaitTime, SECONDS); - } - - }; - - compactionStrategy.setRetryCount(retryCount); - compactionStrategy.setForceAfterFail(forceAfterFail); - compactionStrategy.setPersistCompactionMap(persistCompactionMap); - compactionStrategy.setGainThreshold(gainThreshold); - - return compactionStrategy; - } - - private boolean registerSegmentNodeStore() throws IOException { Dictionary properties = context.getProperties(); name = String.valueOf(properties.get(NAME)); final long blobGcMaxAgeInSecs = toLong(property(PROP_BLOB_GC_MAX_AGE), DEFAULT_BLOB_GC_MAX_AGE); - OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext()); - SegmentNodeStoreBuilder nodeStoreBuilder = SegmentNodeStore.newSegmentNodeStore(store); nodeStoreBuilder.withCompactionStrategy(compactionStrategy); + if (toBoolean(property(STANDBY), false)) { + nodeStoreBuilder.withDispatchChanges(false); + } segmentNodeStore = nodeStoreBuilder.create(); observerTracker = new ObserverTracker(segmentNodeStore); @@ -579,9 +533,51 @@ } log.info("SegmentNodeStore initialized"); + + // Register a factory service to expose the FileStore + + providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, null); + return true; } + private CompactionStrategy newCompactionStrategy() { + boolean pauseCompaction = toBoolean(property(PAUSE_COMPACTION), PAUSE_DEFAULT); + boolean cloneBinaries = toBoolean(property(COMPACTION_CLONE_BINARIES), CLONE_BINARIES_DEFAULT); + long cleanupTs = toLong(property(COMPACTION_CLEANUP_TIMESTAMP), TIMESTAMP_DEFAULT); + int retryCount = toInteger(property(COMPACTION_RETRY_COUNT), RETRY_COUNT_DEFAULT); + boolean forceAfterFail = toBoolean(property(COMPACTION_FORCE_AFTER_FAIL), FORCE_AFTER_FAIL_DEFAULT); + final int lockWaitTime = toInteger(property(COMPACTION_LOCK_WAIT_TIME), COMPACTION_LOCK_WAIT_TIME_DEFAULT); + boolean persistCompactionMap = toBoolean(property(PERSIST_COMPACTION_MAP), PERSIST_COMPACTION_MAP_DEFAULT); + + CleanupType cleanupType = getCleanUpType(); + byte memoryThreshold = getMemoryThreshold(); + byte gainThreshold = getGainThreshold(); + + // This is indeed a dirty hack, but it's needed to break a circular + // dependency between different components. The FileStore needs the + // CompactionStrategy, the CompactionStrategy needs the + // SegmentNodeStore, and the SegmentNodeStore needs the FileStore. + + CompactionStrategy compactionStrategy = new CompactionStrategy(pauseCompaction, cloneBinaries, cleanupType, cleanupTs, memoryThreshold) { + + @Override + public boolean compacted(Callable setHead) throws Exception { + // Need to guard against concurrent commits to avoid + // mixed segments. See OAK-2192. + return segmentNodeStore.locked(setHead, lockWaitTime, SECONDS); + } + + }; + + compactionStrategy.setRetryCount(retryCount); + compactionStrategy.setForceAfterFail(forceAfterFail); + compactionStrategy.setPersistCompactionMap(persistCompactionMap); + compactionStrategy.setGainThreshold(gainThreshold); + + return compactionStrategy; + } + private void unregisterNodeStore() { if (segmentCacheMBean != null) { segmentCacheMBean.unregister(); Index: . =================================================================== --- . (revision 1801818) +++ . (working copy) Property changes on: . ___________________________________________________________________ Modified: svn:mergeinfo ## -0,0 +0,1 ## Merged /jackrabbit/oak/trunk:r1764475