diff --git oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java index 4b5189b..b057bde 100644 --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java @@ -16,28 +16,6 @@ */ package org.apache.jackrabbit.oak.plugins.segment; -import static com.google.common.base.Preconditions.checkState; -import static java.util.Collections.emptyMap; -import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean; -import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toInteger; -import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong; -import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CLEANUP_DEFAULT; -import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CLONE_BINARIES_DEFAULT; -import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.FORCE_AFTER_FAIL_DEFAULT; -import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.MEMORY_THRESHOLD_DEFAULT; -import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.PAUSE_DEFAULT; -import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.RETRY_COUNT_DEFAULT; -import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.TIMESTAMP_DEFAULT; -import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; -import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay; - -import java.io.ByteArrayInputStream; -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.util.Dictionary; -import java.util.Hashtable; - import org.apache.commons.io.FilenameUtils; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; @@ -86,6 +64,30 @@ import org.osgi.service.component.ComponentContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Collections.emptyMap; +import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean; +import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toInteger; +import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CLEANUP_DEFAULT; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CLONE_BINARIES_DEFAULT; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.FORCE_AFTER_FAIL_DEFAULT; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.MEMORY_THRESHOLD_DEFAULT; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.PAUSE_DEFAULT; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.RETRY_COUNT_DEFAULT; +import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.TIMESTAMP_DEFAULT; +import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean; +import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay; + /** * An OSGi wrapper for the segment node store. */ @@ -105,14 +107,14 @@ public class SegmentNodeStoreService extends ProxyNodeStore @Property( label = "Directory", - description="Directory location used to store the segment tar files. If not specified then looks " + + description = "Directory location used to store the segment tar files. If not specified then looks " + "for framework property 'repository.home' otherwise use a subdirectory with name 'tarmk'" ) public static final String DIRECTORY = "repository.home"; @Property( label = "Mode", - description="TarMK mode (64 for memory mapping, 32 for normal file access)" + description = "TarMK mode (64 for memory mapping, 32 for normal file access)" ) public static final String MODE = "tarmk.mode"; @@ -140,10 +142,10 @@ public class SegmentNodeStoreService extends ProxyNodeStore @Property(options = { @PropertyOption(name = "CLEAN_ALL", value = "CLEAN_ALL"), @PropertyOption(name = "CLEAN_NONE", value = "CLEAN_NONE"), - @PropertyOption(name = "CLEAN_OLD", value = "CLEAN_OLD") }, + @PropertyOption(name = "CLEAN_OLD", value = "CLEAN_OLD")}, value = "CLEAN_OLD", label = "Cleanup Strategy", - description = "Cleanup strategy used for live in memory segment references while performing cleanup. "+ + description = "Cleanup strategy used for live in memory segment references while performing cleanup. " + "1. CLEAN_NONE: All in memory references are considered valid, " + "2. CLEAN_OLD: Only in memory references older than a " + "certain age are considered valid (compaction.cleanup.timestamp), " + @@ -241,51 +243,108 @@ public class SegmentNodeStoreService extends ProxyNodeStore private WhiteboardExecutor executor; private boolean customBlobStore; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + @Override - protected synchronized SegmentNodeStore getNodeStore() { - checkState(delegate != null, "service must be activated when used"); - return delegate; + protected SegmentNodeStore getNodeStore() { + lock.readLock().lock(); + + try { + return safeGetNodeStore(); + } finally { + lock.readLock().unlock(); + } + } @Activate private void activate(ComponentContext context) throws IOException { + lock.writeLock().lock(); + + try { + safeActivate(context); + } finally { + lock.writeLock().unlock(); + } + } + + @Deactivate + private void deactivate() { + lock.writeLock().lock(); + + try { + safeDeactivate(); + } finally { + lock.writeLock().unlock(); + } + } + + protected void bindBlobStore(BlobStore blobStore) throws IOException { + lock.writeLock().lock(); + + try { + safeBindBlobStore(blobStore); + } finally { + lock.writeLock().unlock(); + } + } + + protected void unbindBlobStore(BlobStore blobStore) { + lock.writeLock().lock(); + + try { + safeUnbindBlobStore(blobStore); + } finally { + lock.writeLock().unlock(); + } + } + + private SegmentNodeStore safeGetNodeStore() { + return checkNotNull(delegate, "service must be activated when used"); + } + + private void safeActivate(ComponentContext context) throws IOException { this.context = context; this.customBlobStore = Boolean.parseBoolean(lookup(context, CUSTOM_BLOB_STORE)); - if (blobStore == null && customBlobStore) { - log.info("BlobStore use enabled. SegmentNodeStore would be initialized when BlobStore would be available"); - } else { - registerNodeStore(); - } + registerNodeStore(); } - public void registerNodeStore() throws IOException { - if (registerSegmentStore()) { - boolean standby = toBoolean(lookup(context, STANDBY), false); - providerRegistration = context.getBundleContext().registerService( - SegmentStoreProvider.class.getName(), this, null); - if (!standby) { - Dictionary props = new Hashtable(); - props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName()); - props.put("oak.nodestore.description", new String[]{"nodeStoreType=segment"}); - storeRegistration = context.getBundleContext().registerService( - NodeStore.class.getName(), this, props); - } + private void safeDeactivate() { + unregisterNodeStore(); + } + + private void safeBindBlobStore(BlobStore blobStore) throws IOException { + unregisterNodeStore(); + this.blobStore = blobStore; + registerNodeStore(); + } + + private void safeUnbindBlobStore(BlobStore blobStore) { + if (this.blobStore == blobStore) { + unregisterNodeStore(); + this.blobStore = null; } } - public synchronized boolean registerSegmentStore() throws IOException { + private void registerNodeStore() throws IOException { if (context == null) { log.info("Component still not activated. Ignoring the initialization call"); - return false; + return; } + + if (blobStore == null && customBlobStore) { + log.info("BlobStore use enabled. SegmentNodeStore would be initialized when BlobStore would be available"); + return; + } + Dictionary properties = context.getProperties(); name = String.valueOf(properties.get(NAME)); String directory = lookup(context, DIRECTORY); if (directory == null) { directory = "tarmk"; - }else{ + } else { directory = FilenameUtils.concat(directory, "segmentstore"); } @@ -384,7 +443,7 @@ public class SegmentNodeStoreService extends ProxyNodeStore try { String repoId = ClusterRepositoryInfo.createId(delegate); ((SharedDataStore) blobStore).addMetadataRecord(new ByteArrayInputStream(new byte[0]), - SharedStoreRecordType.REPOSITORY.getNameFromId(repoId)); + SharedStoreRecordType.REPOSITORY.getNameFromId(repoId)); } catch (Exception e) { throw new IOException("Could not register a unique repositoryId", e); } @@ -414,52 +473,25 @@ public class SegmentNodeStoreService extends ProxyNodeStore "Segment node store compaction strategy settings"); log.info("SegmentNodeStore initialized"); - return true; - } - private static String lookup(ComponentContext context, String property) { - if (context.getProperties().get(property) != null) { - return context.getProperties().get(property).toString(); - } - if (context.getBundleContext().getProperty(property) != null) { - return context.getBundleContext().getProperty(property); - } - return null; - } - - @Deactivate - public synchronized void deactivate() { - unregisterNodeStore(); - - if (observerTracker != null) { - observerTracker.stop(); - } - if (gcMonitor != null) { - gcMonitor.stop(); - } - delegate = null; - if (store != null) { - store.close(); - store = null; + boolean standby = toBoolean(lookup(context, STANDBY), false); + providerRegistration = context.getBundleContext().registerService( + SegmentStoreProvider.class.getName(), this, null); + if (!standby) { + Dictionary props = new Hashtable(); + props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName()); + props.put("oak.nodestore.description", new String[]{"nodeStoreType=segment"}); + storeRegistration = context.getBundleContext().registerService( + NodeStore.class.getName(), this, props); } } - protected void bindBlobStore(BlobStore blobStore) throws IOException { - this.blobStore = blobStore; - registerNodeStore(); - } - - protected void unbindBlobStore(BlobStore blobStore){ - this.blobStore = null; - unregisterNodeStore(); - } - private void unregisterNodeStore() { - if(providerRegistration != null){ + if (providerRegistration != null) { providerRegistration.unregister(); providerRegistration = null; } - if(storeRegistration != null){ + if (storeRegistration != null) { storeRegistration.unregister(); storeRegistration = null; } @@ -487,6 +519,28 @@ public class SegmentNodeStoreService extends ProxyNodeStore executor.stop(); executor = null; } + + if (observerTracker != null) { + observerTracker.stop(); + } + if (gcMonitor != null) { + gcMonitor.stop(); + } + delegate = null; + if (store != null) { + store.close(); + store = null; + } + } + + private static String lookup(ComponentContext context, String property) { + if (context.getProperties().get(property) != null) { + return context.getProperties().get(property).toString(); + } + if (context.getBundleContext().getProperty(property) != null) { + return context.getBundleContext().getProperty(property); + } + return null; } /** @@ -510,4 +564,5 @@ public class SegmentNodeStoreService extends ProxyNodeStore public String toString() { return name + ": " + delegate; } + }