diff --git oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java index 2ad9a73a2e..bfbb2cd424 100644 --- oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java +++ oak-benchmarks/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java @@ -166,6 +166,15 @@ public class BenchmarkRunner { .withOptionalArg().ofType(Integer.class).defaultsTo(1000); OptionSpec vgcMaxAge = parser.accepts("vgcMaxAge", "Continuous DocumentNodeStore VersionGC max age in sec (RDB only)") .withRequiredArg().ofType(Integer.class).defaultsTo(-1); + OptionSpec coldSyncInterval = parser.accepts("coldSyncInterval", "interval between sync cycles in sec (Segment-Tar-Cold only)") + .withRequiredArg().ofType(Integer.class).defaultsTo(5); + OptionSpec coldUseDataStore = parser.accepts("useDataStore", "Whether to use a datastore in the cold standby topology") + .withOptionalArg().ofType(Boolean.class) + .defaultsTo(Boolean.TRUE); + OptionSpec coldShareDataStore = parser.accepts("shareDataStore", "Whether to share the datastore for primary and standby in the cold standby topology") + .withOptionalArg().ofType(Boolean.class) + .defaultsTo(Boolean.FALSE); + OptionSpec verbose = parser.accepts("verbose", "Enable verbose output"); OptionSpec nonOption = parser.nonOptions(); OptionSpec help = parser.acceptsAll(asList("h", "?", "help"), "show help").forHelp(); @@ -203,6 +212,9 @@ public class BenchmarkRunner { mmap.value(options)), OakRepositoryFixture.getSegmentTarWithBlobStore(base.value(options), 256, cacheSize, mmap.value(options), fdsCache.value(options)), + OakRepositoryFixture.getSegmentTarWithColdStandby(base.value(options), 256, cacheSize, + mmap.value(options), coldUseDataStore.value(options), fdsCache.value(options), + coldSyncInterval.value(options), coldShareDataStore.value(options)), OakRepositoryFixture.getRDB(rdbjdbcuri.value(options), rdbjdbcuser.value(options), rdbjdbcpasswd.value(options), rdbjdbctableprefix.value(options), dropDBAfterTest.value(options), cacheSize * MB, vgcMaxAge.value(options)), diff --git oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/OakFixture.java oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/OakFixture.java index 1079ab7cbb..2f2f602baf 100644 --- oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/OakFixture.java +++ oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/OakFixture.java @@ -29,6 +29,7 @@ import com.google.common.base.Predicate; import com.google.common.base.Splitter; import com.google.common.base.Strings; import org.apache.jackrabbit.oak.Oak; +import org.apache.jackrabbit.oak.fixture.SegmentTarFixture.SegmentTarFixtureBuilder; import org.apache.jackrabbit.oak.plugins.document.DocumentMK; import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector; @@ -62,6 +63,7 @@ public abstract class OakFixture { public static final String OAK_SEGMENT_TAR = "Oak-Segment-Tar"; public static final String OAK_SEGMENT_TAR_DS = "Oak-Segment-Tar-DS"; + public static final String OAK_SEGMENT_TAR_COLD = "Oak-Segment-Tar-Cold"; public static final String OAK_COMPOSITE_STORE = "Oak-Composite-Store"; public static final String OAK_COMPOSITE_MEMORY_STORE = "Oak-Composite-Memory-Store"; @@ -308,16 +310,41 @@ public abstract class OakFixture { } public static OakFixture getSegmentTar(final String name, final File base, - final int maxFileSizeMB, final int cacheSizeMB, final boolean memoryMapping, - final boolean useBlobStore) { - return getSegmentTar(name, base, maxFileSizeMB, cacheSizeMB, memoryMapping, useBlobStore, 0); + final int maxFileSizeMB, final int cacheSizeMB, final boolean memoryMapping) { + SegmentTarFixtureBuilder builder = SegmentTarFixtureBuilder.segmentTarFixtureBuilder(name, base); + builder.withMaxFileSize(maxFileSizeMB) + .withSegmentCacheSize(cacheSizeMB) + .withMemoryMapping(memoryMapping); + + return builder.build(); } public static OakFixture getSegmentTar(final String name, final File base, final int maxFileSizeMB, final int cacheSizeMB, final boolean memoryMapping, final boolean useBlobStore, final int dsCacheInMB) { - return new SegmentTarFixture(name, base, maxFileSizeMB, cacheSizeMB, memoryMapping, useBlobStore, dsCacheInMB); + SegmentTarFixtureBuilder builder = SegmentTarFixtureBuilder.segmentTarFixtureBuilder(name, base); + builder.withMaxFileSize(maxFileSizeMB) + .withSegmentCacheSize(cacheSizeMB) + .withMemoryMapping(memoryMapping) + .withBlobStore(useBlobStore) + .withDSCacheSize(dsCacheInMB); + + return builder.build(); } + + public static OakFixture getSegmentTar(final String name, final File base, + final int maxFileSizeMB, final int cacheSizeMB, final boolean memoryMapping, + final boolean useBlobStore, final int dsCacheInMB, + final boolean withColdStandby, final int syncInterval, final boolean shareBlobStore) { + SegmentTarFixtureBuilder builder = SegmentTarFixtureBuilder.segmentTarFixtureBuilder(name, base); + builder.withMaxFileSize(maxFileSizeMB) + .withSegmentCacheSize(cacheSizeMB) + .withMemoryMapping(memoryMapping) + .withBlobStore(useBlobStore) + .withDSCacheSize(dsCacheInMB); + + return new SegmentTarFixture(builder, withColdStandby, syncInterval, shareBlobStore); + } public static OakFixture getCompositeStore(final String name, final File base, final int maxFileSizeMB, final int cacheSizeMB, final boolean memoryMapping, diff --git oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java index bce928ee2b..bd529bee42 100644 --- oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java +++ oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/OakRepositoryFixture.java @@ -97,8 +97,7 @@ public class OakRepositoryFixture implements RepositoryFixture { public static RepositoryFixture getSegmentTar(File base, int maxFileSizeMB, int cacheSizeMB, boolean memoryMapping) { return new OakRepositoryFixture(OakFixture - .getSegmentTar(OakFixture.OAK_SEGMENT_TAR, base, maxFileSizeMB, cacheSizeMB, - memoryMapping, false)); + .getSegmentTar(OakFixture.OAK_SEGMENT_TAR, base, maxFileSizeMB, cacheSizeMB, memoryMapping)); } public static RepositoryFixture getSegmentTarWithBlobStore(File base, int maxFileSizeMB, @@ -107,6 +106,13 @@ public class OakRepositoryFixture implements RepositoryFixture { .getSegmentTar(OakFixture.OAK_SEGMENT_TAR_DS, base, maxFileSizeMB, cacheSizeMB, memoryMapping, true, dsCacheInMB)); } + + public static RepositoryFixture getSegmentTarWithColdStandby(File base, int maxFileSizeMB, + int cacheSizeMB, boolean memoryMapping, boolean useBlobStore, int dsCacheInMB, int syncInterval, boolean shareBlobStore) { + return new OakRepositoryFixture(OakFixture + .getSegmentTar(OakFixture.OAK_SEGMENT_TAR_COLD, base, maxFileSizeMB, cacheSizeMB, + memoryMapping, useBlobStore, dsCacheInMB, true, syncInterval, shareBlobStore)); + } public static RepositoryFixture getCompositeStore(File base, int maxFileSizeMB, int cacheSizeMB, final boolean memoryMapping, int mounts, int pathsPerMount) { diff --git oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java index f74f351048..ee215e318d 100644 --- oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java +++ oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java @@ -20,79 +20,236 @@ package org.apache.jackrabbit.oak.fixture; import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder; import java.io.File; +import java.net.ServerSocket; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; import org.apache.commons.io.FileUtils; +import org.apache.jackrabbit.core.data.FileDataStore; import org.apache.jackrabbit.oak.Oak; +import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore; +import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; +import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; +import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.apache.jackrabbit.oak.stats.StatisticsProvider; class SegmentTarFixture extends OakFixture { + /** + * Listener instance doing nothing on a {@code SegmentNotFoundException} + */ + SegmentNotFoundExceptionListener IGNORE_SNFE = new SegmentNotFoundExceptionListener() { + @Override + public void notify(@Nonnull SegmentId id, @Nonnull SegmentNotFoundException snfe) { } + }; + + private static final int MB = 1024 * 1024; - private FileStore[] stores; - - private BlobStoreFixture[] blobStoreFixtures = new BlobStoreFixture[0]; - + static class SegmentTarFixtureBuilder { + private final String name; + private final File base; + + private int maxFileSize; + private int segmentCacheSize; + private boolean memoryMapping; + private boolean useBlobStore; + private int dsCacheSize; + + public static SegmentTarFixtureBuilder segmentTarFixtureBuilder(String name, File directory) { + return new SegmentTarFixtureBuilder(name, directory); + } + + private SegmentTarFixtureBuilder(String name, File base) { + this.name = name; + this.base = base; + } + + public SegmentTarFixtureBuilder withMaxFileSize(int maxFileSize) { + this.maxFileSize = maxFileSize; + return this; + } + + public SegmentTarFixtureBuilder withSegmentCacheSize(int segmentCacheSize) { + this.segmentCacheSize = segmentCacheSize; + return this; + } + + public SegmentTarFixtureBuilder withMemoryMapping(boolean memoryMapping) { + this.memoryMapping = memoryMapping; + return this; + } + + public SegmentTarFixtureBuilder withBlobStore(boolean useBlobStore) { + this.useBlobStore = useBlobStore; + return this; + } + + public SegmentTarFixtureBuilder withDSCacheSize(int dsCacheSize) { + this.dsCacheSize = dsCacheSize; + return this; + } + + public SegmentTarFixture build() { + return new SegmentTarFixture(this); + } + } + private final File base; - - private final int maxFileSizeMB; - - private final int cacheSizeMB; - + private final int maxFileSize; + private final int segmentCacheSize; private final boolean memoryMapping; - private final boolean useBlobStore; + private final int dsCacheSize; + + private final boolean withColdStandby; + private final int syncInterval; + private final boolean shareBlobStore; + + private final File parentPath; - private final int dsCacheSizeInMB; - - public SegmentTarFixture(String name, File base, int maxFileSizeMB, int cacheSizeMB, - boolean memoryMapping, boolean useBlobStore, int dsCacheSizeInMB) { - super(name); - this.base = base; - this.maxFileSizeMB = maxFileSizeMB; - this.cacheSizeMB = cacheSizeMB; - this.memoryMapping = memoryMapping; - this.useBlobStore = useBlobStore; - this.dsCacheSizeInMB = dsCacheSizeInMB; + private FileStore[] stores; + private BlobStoreFixture[] blobStoreFixtures; + + private StandbyServerSync[] serverSyncs; + private StandbyClientSync[] clientSyncs; + private ScheduledExecutorService[] executors; + + public SegmentTarFixture(SegmentTarFixtureBuilder builder) { + this(builder, false, -1, false); + } + + public SegmentTarFixture(SegmentTarFixtureBuilder builder, boolean withColdStandby, int syncInterval) { + this(builder, withColdStandby, syncInterval, false); + } + + public SegmentTarFixture(SegmentTarFixtureBuilder builder, boolean withColdStandby, int syncInterval, boolean shareBlobStore) { + super(builder.name); + this.base = builder.base; + this.parentPath = new File(base, unique); + + this.maxFileSize = builder.maxFileSize; + this.segmentCacheSize = builder.segmentCacheSize; + this.memoryMapping = builder.memoryMapping; + this.useBlobStore = builder.useBlobStore; + this.dsCacheSize = builder.dsCacheSize; + + this.withColdStandby = withColdStandby; + this.syncInterval = syncInterval; + this.shareBlobStore = shareBlobStore; } @Override public Oak getOak(int clusterId) throws Exception { - FileStore fs = fileStoreBuilder(base) - .withMaxFileSize(maxFileSizeMB) - .withSegmentCacheSize(cacheSizeMB) - .withMemoryMapping(memoryMapping) - .build(); + FileStoreBuilder fileStoreBuilder = fileStoreBuilder(parentPath) + .withMaxFileSize(maxFileSize) + .withSegmentCacheSize(segmentCacheSize) + .withMemoryMapping(memoryMapping); + + + if (useBlobStore) { + FileDataStore fds = new FileDataStore(); + fds.setMinRecordLength(4092); + fds.init(parentPath.getAbsolutePath()); + BlobStore blobStore = new DataStoreBlobStore(fds); + + fileStoreBuilder.withBlobStore(blobStore); + } + + FileStore fs = fileStoreBuilder.build(); return newOak(SegmentNodeStoreBuilders.builder(fs).build()); } @Override public Oak[] setUpCluster(int n, StatisticsProvider statsProvider) throws Exception { - Oak[] cluster = new Oak[n]; - stores = new FileStore[cluster.length]; - if (useBlobStore) { - blobStoreFixtures = new BlobStoreFixture[cluster.length]; + int fileStoresLength = n; + int blobStoresLength = 0; + + if (withColdStandby) { + fileStoresLength = 2 * n; + + if (useBlobStore) { + if (shareBlobStore) { + blobStoresLength = n; + } else { + blobStoresLength = 2 * n; + } + } + + serverSyncs = new StandbyServerSync[n]; + clientSyncs = new StandbyClientSync[n]; + executors = new ScheduledExecutorService[n]; + } else { + if (useBlobStore) { + blobStoresLength = n; + } } - + + Oak[] cluster = new Oak[n]; + stores = new FileStore[fileStoresLength]; + blobStoreFixtures = new BlobStoreFixture[blobStoresLength]; + for (int i = 0; i < cluster.length; i++) { BlobStore blobStore = null; if (useBlobStore) { - blobStoreFixtures[i] = BlobStoreFixture.create(base, true, dsCacheSizeInMB, statsProvider); + blobStoreFixtures[i] = BlobStoreFixture.create(parentPath, true, dsCacheSize, statsProvider); blobStore = blobStoreFixtures[i].setUp(); } - FileStoreBuilder builder = fileStoreBuilder(new File(base, unique)); + FileStoreBuilder builder = fileStoreBuilder(new File(parentPath, "primary-" + i)); if (blobStore != null) { builder.withBlobStore(blobStore); } + stores[i] = builder - .withMaxFileSize(maxFileSizeMB) + .withMaxFileSize(maxFileSize) .withStatisticsProvider(statsProvider) - .withSegmentCacheSize(cacheSizeMB) + .withSegmentCacheSize(segmentCacheSize) .withMemoryMapping(memoryMapping) .build(); + + if (withColdStandby) { + builder = fileStoreBuilder(new File(parentPath, "standby-" + i)); + + if (useBlobStore) { + if (shareBlobStore) { + builder.withBlobStore(blobStore); + } else { + blobStoreFixtures[n + i] = BlobStoreFixture.create(parentPath, true, dsCacheSize, statsProvider); + blobStore = blobStoreFixtures[n + i].setUp(); + } + } + + stores[n + i] = builder + .withMaxFileSize(maxFileSize) + .withStatisticsProvider(statsProvider) + .withSegmentCacheSize(segmentCacheSize) + .withMemoryMapping(memoryMapping) + .withSnfeListener(IGNORE_SNFE) + .build(); + + int port = 0; + try (ServerSocket socket = new ServerSocket(0)) { + port = socket.getLocalPort(); + } + + serverSyncs[i] = new StandbyServerSync(port, stores[i], 1 * MB); + clientSyncs[i] = new StandbyClientSync("127.0.0.1", port, stores[n + i], false, 60_000, false); + + serverSyncs[i].start(); + clientSyncs[i].start(); + + executors[i] = Executors.newScheduledThreadPool(1); + executors[i].scheduleAtFixedRate(clientSyncs[i], 0, syncInterval, TimeUnit.SECONDS); + } + cluster[i] = newOak(SegmentNodeStoreBuilders.builder(stores[i]).build()); } return cluster; @@ -100,13 +257,31 @@ class SegmentTarFixture extends OakFixture { @Override public void tearDownCluster() { + if (withColdStandby) { + for (StandbyClientSync clientSync : clientSyncs) { + clientSync.close(); + } + + for (StandbyServerSync serverSync : serverSyncs) { + serverSync.close(); + } + + for (ExecutorService executor : executors) { + executor.shutdownNow(); + } + } + for (FileStore store : stores) { store.close(); } - for (BlobStoreFixture blobStore : blobStoreFixtures) { - blobStore.tearDown(); + + if (blobStoreFixtures != null) { + for (BlobStoreFixture bsf : blobStoreFixtures) { + bsf.tearDown(); + } } - FileUtils.deleteQuietly(new File(base, unique)); + + FileUtils.deleteQuietly(parentPath); } public BlobStoreFixture[] getBlobStoreFixtures() { diff --git oak-run/src/main/java/org/apache/jackrabbit/oak/run/ServerCommand.java oak-run/src/main/java/org/apache/jackrabbit/oak/run/ServerCommand.java index 9f0fb2009c..d799070ad5 100644 --- oak-run/src/main/java/org/apache/jackrabbit/oak/run/ServerCommand.java +++ oak-run/src/main/java/org/apache/jackrabbit/oak/run/ServerCommand.java @@ -122,7 +122,7 @@ class ServerCommand implements Command { if (baseFile == null) { throw new IllegalArgumentException("Required argument base missing."); } - oakFixture = OakFixture.getSegmentTar(OakFixture.OAK_SEGMENT_TAR, baseFile, 256, cacheSize, mmap.value(options), false); + oakFixture = OakFixture.getSegmentTar(OakFixture.OAK_SEGMENT_TAR, baseFile, 256, cacheSize, mmap.value(options)); } else if (fix.equals(OakFixture.OAK_RDB)) { oakFixture = OakFixture.getRDB(OakFixture.OAK_RDB, rdbjdbcuri.value(options), rdbjdbcuser.value(options), rdbjdbcpasswd.value(options), rdbjdbctableprefix.value(options), false, cacheSize, -1);