Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java (revision 1774032) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java (working copy) @@ -23,6 +23,8 @@ import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toInteger; import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong; import static org.apache.jackrabbit.oak.osgi.OsgiUtil.lookupConfigurationThenFramework; +import static org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.isShared; +import static org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo.getOrCreateId; import static org.apache.jackrabbit.oak.segment.CachingSegmentReader.DEFAULT_STRING_CACHE_MB; import static org.apache.jackrabbit.oak.segment.CachingSegmentReader.DEFAULT_TEMPLATE_CACHE_MB; import static org.apache.jackrabbit.oak.segment.SegmentCache.DEFAULT_SEGMENT_CACHE_MB; @@ -86,7 +88,6 @@ import org.apache.jackrabbit.oak.plugins.blob.MarkSweepGarbageCollector; import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore; import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker; -import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils; import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType; import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; @@ -107,6 +108,7 @@ import org.apache.jackrabbit.oak.spi.whiteboard.AbstractServiceTracker; import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration; import org.apache.jackrabbit.oak.spi.whiteboard.Registration; +import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard; import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor; import org.apache.jackrabbit.oak.stats.Clock; import org.apache.jackrabbit.oak.stats.StatisticsProvider; @@ -518,20 +520,24 @@ SegmentNodeStore.SegmentNodeStoreBuilder segmentNodeStoreBuilder = SegmentNodeStoreBuilders.builder(store) .withStatisticsProvider(statisticsProvider); - if (isStandbyInstance(context)) { + if (isStandbyInstance(context) || !isPrimarySegmentStore(role)) { segmentNodeStoreBuilder.dispatchChanges(false); } SegmentNodeStore segmentNodeStore = segmentNodeStoreBuilder.build(); - ObserverTracker observerTracker = new ObserverTracker(segmentNodeStore); - observerTracker.start(context.getBundleContext()); - registrations.register(asCloseable(observerTracker)); + if (isPrimarySegmentStore(role)) { + ObserverTracker observerTracker = new ObserverTracker(segmentNodeStore); + observerTracker.start(context.getBundleContext()); + registrations.register(asCloseable(observerTracker)); + } - mbeans.add(registerMBean( - whiteboard, - CheckpointMBean.class, - new SegmentCheckpointMBean(segmentNodeStore), CheckpointMBean.TYPE, - appendRole("Segment node store checkpoint management", role))); + if (isPrimarySegmentStore(role)) { + mbeans.add(registerMBean( + whiteboard, + CheckpointMBean.class, + new SegmentCheckpointMBean(segmentNodeStore), CheckpointMBean.TYPE, + appendRole("Segment node store checkpoint management", role))); + } if (descriptors) { // ensure a clusterId is initialized @@ -539,7 +545,7 @@ GenericDescriptors clusterIdDesc = new GenericDescriptors(); clusterIdDesc.put(ClusterRepositoryInfo.OAK_CLUSTERID_REPOSITORY_DESCRIPTOR_KEY, new SimpleValueFactory().createValue( - ClusterRepositoryInfo.getOrCreateId(segmentNodeStore)), true, false); + getOrCreateId(segmentNodeStore)), true, false); mbeans.add(whiteboard.register( Descriptors.class, clusterIdDesc, @@ -555,50 +561,41 @@ } // If a shared data store register the repo id in the data store - String repoId = ""; - if (SharedDataStoreUtils.isShared(blobStore)) { + if (isPrimarySegmentStore(role) && isShared(blobStore)) { + SharedDataStore sharedDataStore = (SharedDataStore) blobStore; try { - repoId = ClusterRepositoryInfo.getOrCreateId(segmentNodeStore); - ((SharedDataStore) blobStore).addMetadataRecord(new ByteArrayInputStream(new byte[0]), - SharedStoreRecordType.REPOSITORY.getNameFromId(repoId)); + sharedDataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]), SharedStoreRecordType.REPOSITORY.getNameFromId(getOrCreateId(segmentNodeStore))); } catch (Exception e) { throw new IOException("Could not register a unique repositoryId", e); } - if (blobStore instanceof BlobTrackingStore) { - final long trackSnapshotInterval = toLong(property(PROP_BLOB_SNAPSHOT_INTERVAL, context), - DEFAULT_BLOB_SNAPSHOT_INTERVAL); + final long trackSnapshotInterval = toLong(property(PROP_BLOB_SNAPSHOT_INTERVAL, context), DEFAULT_BLOB_SNAPSHOT_INTERVAL); String root = property(DIRECTORY, context); if (Strings.isNullOrEmpty(root)) { root = "repository"; } - BlobTrackingStore trackingStore = (BlobTrackingStore) blobStore; if (trackingStore.getTracker() != null) { trackingStore.getTracker().close(); } - ((BlobTrackingStore) blobStore).addTracker( - new BlobIdTracker(root, repoId, trackSnapshotInterval, (SharedDataStore) - blobStore)); + trackingStore.addTracker(new BlobIdTracker(root, getOrCreateId(segmentNodeStore), trackSnapshotInterval, sharedDataStore)); } } - if (store.getBlobStore() instanceof GarbageCollectableBlobStore) { - final long blobGcMaxAgeInSecs = toLong(property(PROP_BLOB_GC_MAX_AGE, context), DEFAULT_BLOB_GC_MAX_AGE); + if (isPrimarySegmentStore(role) && isGarbageCollectable(blobStore)) { BlobGarbageCollector gc = new MarkSweepGarbageCollector( new SegmentBlobReferenceRetriever(store), - (GarbageCollectableBlobStore) store.getBlobStore(), + (GarbageCollectableBlobStore) blobStore, executor, - TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs), - repoId + TimeUnit.SECONDS.toMillis(getBlobGcMaxAge(context)), + getOrCreateId(segmentNodeStore) ); - mbeans.add(registerMBean( whiteboard, BlobGCMBean.class, new BlobGC(gc, executor), BlobGCMBean.TYPE, - appendRole("Segment node store blob garbage collection", role) + "Segment node store blob garbage collection" )); } @@ -623,16 +620,15 @@ appendRole("SegmentNodeStore statistics", role) )); - log.info("SegmentNodeStore initialized"); + if (isPrimarySegmentStore(role)) { + log.info("Primary SegmentNodeStore initialized"); + } else { + log.info("Secondary SegmentNodeStore initialized, role={}", role); + } // Register a factory service to expose the FileStore + registerSegmentStoreProvider(role, store, whiteboard, registrations); - registrations.register(asCloseable(whiteboard.register( - SegmentStoreProvider.class, - new DefaultSegmentStoreProvider(store), - Collections.emptyMap() - ))); - registrations.register(asCloseable(new CompositeRegistration(mbeans))); if (isStandbyInstance(context)) { @@ -639,10 +635,12 @@ return segmentNodeStore; } - Map props = new HashMap(); - props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName()); - props.put("oak.nodestore.description", new String[] {"nodeStoreType=segment"}); - registrations.register(asCloseable(whiteboard.register(NodeStore.class, segmentNodeStore, props))); + if (isPrimarySegmentStore(role)) { + Map props = new HashMap(); + props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName()); + props.put("oak.nodestore.description", new String[] {"nodeStoreType=segment"}); + registrations.register(asCloseable(whiteboard.register(NodeStore.class, segmentNodeStore, props))); + } return segmentNodeStore; } @@ -655,6 +653,22 @@ } } + private static boolean isGarbageCollectable(BlobStore store) { + return store instanceof GarbageCollectableBlobStore; + } + + private static void registerSegmentStoreProvider(String role, SegmentStore store, Whiteboard whiteboard, Closer registrations) { + Map properties = new HashMap<>(); + if (role != null) { + properties.put("role", role); + } + registrations.register(asCloseable(whiteboard.register( + SegmentStoreProvider.class, + new DefaultSegmentStoreProvider(store), + properties + ))); + } + private static SegmentGCOptions newGCOptions(ComponentContext context) { boolean pauseCompaction = toBoolean(property(PAUSE_COMPACTION, context), PAUSE_DEFAULT); int retryCount = toInteger(property(COMPACTION_RETRY_COUNT, context), RETRY_COUNT_DEFAULT); @@ -729,6 +743,10 @@ return System.getProperty(propertyName); } + private static long getBlobGcMaxAge(ComponentContext context) { + return toLong(property(PROP_BLOB_GC_MAX_AGE, context), DEFAULT_BLOB_GC_MAX_AGE); + } + private static int getSegmentCacheSize(ComponentContext context) { return toInteger(getCacheSize(SEGMENT_CACHE_SIZE, context), DEFAULT_SEGMENT_CACHE_MB); } @@ -802,4 +820,8 @@ }; } + private static boolean isPrimarySegmentStore(String role) { + return role == null; + } + }