Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (revision 1679162) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStore.java (working copy) @@ -93,6 +93,12 @@ private long maximumBackoff = MILLISECONDS.convert(10, SECONDS); + @Nonnull + public static SegmentNodeStoreBuilder newSegmentNodeStore( + @Nonnull SegmentStore store) { + return SegmentNodeStoreBuilder.newSegmentNodeStore(checkNotNull(store)); + } + public SegmentNodeStore(SegmentStore store) { this.store = store; this.head = new AtomicReference(store.getHead()); Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBuilder.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBuilder.java (revision 0) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBuilder.java (revision 0) @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.segment; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.io.IOException; +import java.util.concurrent.Callable; + +import javax.annotation.Nonnull; + +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; +import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType; + +import static com.google.common.base.Preconditions.checkState; + +public class SegmentNodeStoreBuilder { + + private final SegmentStore store; + + private boolean isCreated; + + private boolean pauseCompaction; + private boolean cloneBinaries; + private String cleanup; + private long cleanupTs; + private byte memoryThreshold; + private int lockWaitTime; + private CompactionStrategy compactionStrategy; + + static SegmentNodeStoreBuilder newSegmentNodeStore(SegmentStore store) { + return new SegmentNodeStoreBuilder(store); + } + + private SegmentNodeStoreBuilder(@Nonnull SegmentStore store) { + this.store = store; + } + + public SegmentNodeStoreBuilder withCompactionStrategy( + boolean pauseCompaction, boolean cloneBinaries, String cleanup, + long cleanupTs, byte memoryThreshold, final int lockWaitTime) { + this.pauseCompaction = pauseCompaction; + this.cloneBinaries = cloneBinaries; + this.cleanup = cleanup; + this.cleanupTs = cleanupTs; + this.memoryThreshold = memoryThreshold; + this.lockWaitTime = lockWaitTime; + return this; + } + + public CompactionStrategy getCompactionStrategy() { + checkState(isCreated); + return compactionStrategy; + } + + @Nonnull + public SegmentNodeStore create() throws IOException { + checkState(!isCreated); + isCreated = true; + final SegmentNodeStore segmentStore = new SegmentNodeStore(store); + compactionStrategy = new CompactionStrategy(pauseCompaction, + cloneBinaries, CleanupType.valueOf(cleanup), cleanupTs, + memoryThreshold) { + + @Override + public boolean compacted(Callable setHead) + throws Exception { + // Need to guard against concurrent commits to avoid + // mixed segments. See OAK-2192. + return segmentStore.locked(setHead, lockWaitTime, SECONDS); + } + }; + return segmentStore; + } + +} Index: src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (revision 1679183) +++ src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (working copy) @@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkState; import static java.util.Collections.emptyMap; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean; import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toInteger; import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong; @@ -38,7 +37,6 @@ import java.io.IOException; import java.util.Dictionary; import java.util.Hashtable; -import java.util.concurrent.Callable; import org.apache.commons.io.FilenameUtils; import org.apache.felix.scr.annotations.Activate; @@ -62,7 +60,6 @@ import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy; -import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType; import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategyMBean; import org.apache.jackrabbit.oak.plugins.segment.compaction.DefaultCompactionStrategyMBean; import org.apache.jackrabbit.oak.plugins.segment.file.FileStore; @@ -220,7 +217,7 @@ private String name; - private SegmentStore store; + private FileStore store; private SegmentNodeStore delegate; @@ -331,18 +328,6 @@ if (memoryThresholdS != null) { memoryThreshold = Byte.valueOf(memoryThresholdS); } - CompactionStrategy compactionStrategy = new CompactionStrategy( - pauseCompaction, cloneBinaries, CleanupType.valueOf(cleanup), cleanupTs, - memoryThreshold) { - @Override - public boolean compacted(Callable setHead) throws Exception { - // Need to guard against concurrent commits to avoid - // mixed segments. See OAK-2192. - return delegate.locked(setHead, lockWaitTime, SECONDS); - } - }; - compactionStrategy.setRetryCount(retryCount); - compactionStrategy.setForceAfterFail(forceCommit); OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext()); gcMonitor = new GCMonitorTracker(); @@ -354,12 +339,20 @@ .withGCMonitor(gcMonitor); if (customBlobStore) { log.info("Initializing SegmentNodeStore with BlobStore [{}]", blobStore); - store = storeBuilder.withBlobStore(blobStore).create() - .setCompactionStrategy(compactionStrategy); + store = storeBuilder.withBlobStore(blobStore).create(); } else { - store = storeBuilder.create() - .setCompactionStrategy(compactionStrategy); + store = storeBuilder.create(); } + SegmentNodeStoreBuilder nodeStoreBuilder = SegmentNodeStore + .newSegmentNodeStore(store); + nodeStoreBuilder.withCompactionStrategy(pauseCompaction, cloneBinaries, + cleanup, cleanupTs, memoryThreshold, lockWaitTime); + delegate = nodeStoreBuilder.create(); + + CompactionStrategy compactionStrategy = nodeStoreBuilder.getCompactionStrategy(); + compactionStrategy.setRetryCount(retryCount); + compactionStrategy.setForceAfterFail(forceCommit); + store.setCompactionStrategy(compactionStrategy); FileStoreGCMonitor fsgcMonitor = new FileStoreGCMonitor(Clock.SIMPLE); fsgcMonitorMBean = new CompositeRegistration( @@ -368,7 +361,6 @@ "File Store garbage collection monitor"), scheduleWithFixedDelay(whiteboard, fsgcMonitor, 1)); - delegate = new SegmentNodeStore(store); observerTracker = new ObserverTracker(delegate); observerTracker.start(context.getBundleContext());