Index: oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java =================================================================== --- oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java (revision 1855035) +++ oak-run-commons/src/main/java/org/apache/jackrabbit/oak/fixture/SegmentTarFixture.java (working copy) @@ -304,8 +304,13 @@ try (ServerSocket socket = new ServerSocket(0)) { port = socket.getLocalPort(); } - - serverSyncs[i] = new StandbyServerSync(port, stores[i], 1 * MB, secure); + + serverSyncs[i] = StandbyServerSync.builder() + .withPort(port) + .withFileStore(stores[i]) + .withBlobChunkSize(1 * MB) + .withSecureConnection(secure) + .build(); clientSyncs[i] = new StandbyClientSync("127.0.0.1", port, stores[n + i], secure, DEFAULT_TIMEOUT, false, new File(StandardSystemProperty.JAVA_IO_TMPDIR.value())); if (!oneShotRun) { Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (revision 1855035) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (working copy) @@ -19,6 +19,8 @@ package org.apache.jackrabbit.oak.segment.standby.server; +import static com.google.common.base.Preconditions.checkState; + import java.security.cert.CertificateException; import java.util.concurrent.TimeUnit; @@ -95,6 +97,14 @@ private CommunicationObserver observer; + private StandbyHeadReader standbyHeadReader; + + private StandbySegmentReader standbySegmentReader; + + private StandbyReferencesReader standbyReferencesReader; + + private StandbyBlobReader standbyBlobReader; + private Builder(final int port, final StoreProvider storeProvider, final int blobChunkSize) { this.port = port; this.storeProvider = storeProvider; @@ -108,23 +118,60 @@ Builder allowIPRanges(String[] allowedClientIPRanges) { this.allowedClientIPRanges = allowedClientIPRanges; - return this; } Builder withStateConsumer(StateConsumer stateConsumer) { this.stateConsumer = stateConsumer; - return this; } Builder withObserver(CommunicationObserver observer) { this.observer = observer; + return this; + } + + Builder withStandbyHeadReader(StandbyHeadReader standbyHeadReader) { + this.standbyHeadReader = standbyHeadReader; + return this; + } + Builder withStandbySegmentReader(StandbySegmentReader standbySegmentReader) { + this.standbySegmentReader = standbySegmentReader; + return this; + } + + Builder withStandbyReferencesReader(StandbyReferencesReader standbyReferencesReader) { + this.standbyReferencesReader = standbyReferencesReader; + return this; + } + + Builder withStandbyBlobReader(StandbyBlobReader standbyBlobReader) { + this.standbyBlobReader = standbyBlobReader; return this; } StandbyServer build() throws CertificateException, SSLException { + checkState(storeProvider != null); + + FileStore store = storeProvider.provideStore(); + + if (standbyReferencesReader == null) { + standbyReferencesReader = new DefaultStandbyReferencesReader(store); + } + + if (standbyBlobReader == null) { + standbyBlobReader = new DefaultStandbyBlobReader(store.getBlobStore()); + } + + if (standbySegmentReader == null) { + standbySegmentReader = new DefaultStandbySegmentReader(store); + } + + if (standbyHeadReader == null) { + standbyHeadReader = new DefaultStandbyHeadReader(store, READ_HEAD_TIMEOUT); + } + return new StandbyServer(this); } @@ -151,8 +198,9 @@ b.childOption(ChannelOption.SO_KEEPALIVE, true); b.childHandler(new ChannelInitializer() { + @Override - public void initChannel(SocketChannel ch) throws Exception { + public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); p.addLast(new ClientFilterHandler(new ClientIpFilter(builder.allowedClientIPRanges))); @@ -187,17 +235,16 @@ // Handlers - FileStore store = builder.storeProvider.provideStore(); - - p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(store, READ_HEAD_TIMEOUT))); - p.addLast(new GetSegmentRequestHandler(new DefaultStandbySegmentReader(store))); - p.addLast(new GetBlobRequestHandler(new DefaultStandbyBlobReader(store.getBlobStore()))); - p.addLast(new GetReferencesRequestHandler(new DefaultStandbyReferencesReader(store))); + p.addLast(new GetHeadRequestHandler(builder.standbyHeadReader)); + p.addLast(new GetSegmentRequestHandler(builder.standbySegmentReader)); + p.addLast(new GetBlobRequestHandler(builder.standbyBlobReader)); + p.addLast(new GetReferencesRequestHandler(builder.standbyReferencesReader)); // Exception handler p.addLast(new ExceptionHandler()); } + }); } Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java (revision 1855035) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java (working copy) @@ -19,6 +19,8 @@ package org.apache.jackrabbit.oak.segment.standby.server; +import static com.google.common.base.Preconditions.checkArgument; + import java.io.Closeable; import java.lang.management.ManagementFactory; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,6 +39,95 @@ public class StandbyServerSync implements StandbyStatusMBean, StateConsumer, StoreProvider, Closeable { + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private int port; + + private FileStore fileStore; + + private int blobChunkSize; + + private boolean secure; + + private String[] allowedClientIPRanges; + + private StandbyBlobReader standbyBlobReader; + + private StandbyHeadReader standbyHeadReader; + + private StandbyReferencesReader standbyReferencesReader; + + private StandbySegmentReader standbySegmentReader; + + private Builder() { + // Prevent external instantiation + } + + public Builder withPort(int port) { + checkArgument(port > 0, "port"); + this.port = port; + return this; + } + + public Builder withFileStore(FileStore fileStore) { + checkArgument(fileStore != null, "fileStore"); + this.fileStore = fileStore; + return this; + } + + public Builder withBlobChunkSize(int blobChunkSize) { + checkArgument(blobChunkSize > 0, "blobChunkSize"); + this.blobChunkSize = blobChunkSize; + return this; + } + + public Builder withSecureConnection(boolean secure) { + this.secure = secure; + return this; + } + + public Builder withAllowedClientIPRanges(String[] allowedClientIPRanges) { + this.allowedClientIPRanges = allowedClientIPRanges; + return this; + } + + Builder withStandbyBlobReader(StandbyBlobReader standbyBlobReader) { + checkArgument(standbyBlobReader != null, "standbyBlobReader"); + this.standbyBlobReader = standbyBlobReader; + return this; + } + + Builder withStandbyHeadReader(StandbyHeadReader standbyHeadReader) { + checkArgument(standbyHeadReader != null, "standbyHeadReader"); + this.standbyHeadReader = standbyHeadReader; + return this; + } + + Builder withStandbyReferencesReader(StandbyReferencesReader standbyReferencesReader) { + checkArgument(standbyReferencesReader != null, "standbyReferencesReader"); + this.standbyReferencesReader = standbyReferencesReader; + return this; + } + + Builder withStandbySegmentReader(StandbySegmentReader standbySegmentReader) { + checkArgument(standbySegmentReader != null, "standbySegmentReader"); + this.standbySegmentReader = standbySegmentReader; + return this; + } + + public StandbyServerSync build() { + checkArgument(port > 0); + checkArgument(fileStore != null); + checkArgument(blobChunkSize > 0); + return new StandbyServerSync(this); + } + + } + private static final Logger log = LoggerFactory.getLogger(StandbyServer.class); private final FileStore fileStore; @@ -55,26 +146,26 @@ private final AtomicBoolean running = new AtomicBoolean(false); - private StandbyServer server; + private final StandbyBlobReader standbyBlobReader; - public StandbyServerSync(final int port, final FileStore fileStore, final int blobChunkSize) { - this(port, fileStore, blobChunkSize, null, false); - } + private final StandbyHeadReader standbyHeadReader; - public StandbyServerSync(final int port, final FileStore fileStore, final int blobChunkSize, final boolean secure) { - this(port, fileStore, blobChunkSize, null, secure); - } + private final StandbyReferencesReader standbyReferencesReader; - public StandbyServerSync(final int port, final FileStore fileStore, final int blobChunkSize, final String[] allowedClientIPRanges) { - this(port, fileStore, blobChunkSize, allowedClientIPRanges, false); - } + private final StandbySegmentReader standbySegmentReader; + + private StandbyServer server; - public StandbyServerSync(final int port, final FileStore fileStore, final int blobChunkSize, final String[] allowedClientIPRanges, final boolean secure) { - this.port = port; - this.fileStore = fileStore; - this.blobChunkSize = blobChunkSize; - this.allowedClientIPRanges = allowedClientIPRanges; - this.secure = secure; + private StandbyServerSync(Builder builder) { + this.port = builder.port; + this.fileStore = builder.fileStore; + this.blobChunkSize = builder.blobChunkSize; + this.allowedClientIPRanges = builder.allowedClientIPRanges; + this.secure = builder.secure; + this.standbyBlobReader = builder.standbyBlobReader; + this.standbyHeadReader = builder.standbyHeadReader; + this.standbyReferencesReader = builder.standbyReferencesReader; + this.standbySegmentReader = builder.standbySegmentReader; this.observer = new CommunicationObserver("primary"); final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); @@ -106,11 +197,15 @@ try { server = StandbyServer.builder(port, this, blobChunkSize) - .secure(secure) - .allowIPRanges(allowedClientIPRanges) - .withStateConsumer(this) - .withObserver(observer) - .build(); + .secure(secure) + .allowIPRanges(allowedClientIPRanges) + .withStateConsumer(this) + .withObserver(observer) + .withStandbyBlobReader(standbyBlobReader) + .withStandbyHeadReader(standbyHeadReader) + .withStandbyReferencesReader(standbyReferencesReader) + .withStandbySegmentReader(standbySegmentReader) + .build(); server.start(); state = STATUS_RUNNING; Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java (revision 1855035) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/StandbyStoreService.java (working copy) @@ -159,7 +159,14 @@ String[] ranges = config.primary_allowed$_$client$_$ip$_$ranges(); boolean secure = config.secure(); - StandbyServerSync standbyServerSync = new StandbyServerSync(port, fileStore, BLOB_CHUNK_SIZE, ranges, secure); + StandbyServerSync standbyServerSync = StandbyServerSync.builder() + .withPort(port) + .withFileStore(fileStore) + .withBlobChunkSize(BLOB_CHUNK_SIZE) + .withAllowedClientIPRanges(ranges) + .withSecureConnection(secure) + .build(); + closer.register(standbyServerSync); standbyServerSync.start(); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java (revision 1855035) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java (working copy) @@ -67,7 +67,12 @@ serverStore.flush(); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), serverStore, MB, false); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(serverStore) + .withBlobChunkSize(MB) + .withSecureConnection(false) + .build(); StandbyClientSync clientSync = new StandbyClientSync(getServerHost(), serverPort.getPort(), clientStore, false, getClientTimeout(), false, folder.newFolder()); ) { serverSync.start(); @@ -87,7 +92,12 @@ storeS.flush(); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), storeS, MB, true); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(storeS) + .withBlobChunkSize(MB) + .withSecureConnection(true) + .build(); StandbyClientSync clientSync = new StandbyClientSync(getServerHost(), serverPort.getPort(), storeC, true, getClientTimeout(), false, folder.newFolder()); ) { serverSync.start(); @@ -155,7 +165,14 @@ addTestContent(store, "server"); serverStore.flush(); - try (StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), serverStore, MB, ssl)) { + try ( + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(serverStore) + .withBlobChunkSize(MB) + .withSecureConnection(ssl) + .build() + ) { serverSync.start(); File spoolFolder = folder.newFolder(); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java (revision 1855035) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java (working copy) @@ -150,7 +150,11 @@ // run 1: unsuccessful try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(primary) + .withBlobChunkSize(MB) + .build(); StandbyClientSync cl = new StandbyClientSync(getServerHost(), serverPort.getPort(), secondary, false, 4_000, false, spoolFolder) ) { serverSync.start(); @@ -163,7 +167,11 @@ // run 2: successful try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(primary) + .withBlobChunkSize(MB) + .build(); StandbyClientSync cl = new StandbyClientSync(getServerHost(), serverPort.getPort(), secondary, false, 4_000, false, spoolFolder) ) { serverSync.start(); @@ -200,7 +208,11 @@ NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); byte[] data = addTestContent(store, "server", blobSize); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(primary) + .withBlobChunkSize(MB) + .build(); StandbyClientSync cl = new StandbyClientSync(getServerHost(), serverPort.getPort(), secondary, false, getClientTimeout(), false, folder.newFolder()) ) { serverSync.start(); @@ -242,7 +254,11 @@ addTestContentOnTheFly(store, "server", blobSize, seed); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 8 * MB); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(primary) + .withBlobChunkSize(8 * MB) + .build(); StandbyClientSync cl = new StandbyClientSync(getServerHost(), serverPort.getPort(), secondary, false, 2 * 60 * 1000, false, folder.newFolder()) ) { serverSync.start(); @@ -281,7 +297,11 @@ NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(primary) + .withBlobChunkSize(MB) + .build(); StandbyClientSync clientSync = new StandbyClientSync(getServerHost(), serverPort.getPort(), secondary, false, getClientTimeout(), false, folder.newFolder()) ) { serverSync.start(); @@ -343,7 +363,13 @@ byte[] data = addTestContent(store, "server", blobSize); primary.flush(); - try (StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB)) { + try ( + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(primary) + .withBlobChunkSize(MB) + .build() + ) { serverSync.start(); File spoolFolder = folder.newFolder(); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java (revision 1855035) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/ExternalPrivateStoreIT.java (working copy) @@ -18,21 +18,12 @@ */ package org.apache.jackrabbit.oak.segment.standby; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; - import java.io.File; -import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; -import org.apache.jackrabbit.oak.segment.standby.server.StandbyServerSync; import org.apache.jackrabbit.oak.segment.test.TemporaryBlobStore; import org.apache.jackrabbit.oak.segment.test.TemporaryFileStore; -import org.apache.jackrabbit.oak.spi.state.NodeStore; -import org.junit.Ignore; import org.junit.Rule; -import org.junit.Test; import org.junit.rules.RuleChain; import org.junit.rules.TemporaryFolder; @@ -70,24 +61,4 @@ return true; } - @Test - @Ignore("OAK-7027") // FIXME OAK-7027 - public void testSyncFailingDueToTooShortTimeout() throws Exception { - final int blobSize = 5 * MB; - FileStore primary = getPrimary(); - FileStore secondary = getSecondary(); - - NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); - addTestContent(store, "server", blobSize); - try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB); - StandbyClientSync cl = new StandbyClientSync(getServerHost(), 60, secondary, false, getClientTimeout(), false, folder.newFolder()) - ) { - serverSync.start(); - primary.flush(); - cl.run(); - assertNotEquals(primary.getHead(), secondary.getHead()); - assertEquals(1, cl.getFailedRequests()); - } - } } Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverIPRangeIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverIPRangeIT.java (revision 1855035) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverIPRangeIT.java (working copy) @@ -157,7 +157,12 @@ NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), storeS, MB, ipRanges); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(storeS) + .withBlobChunkSize(MB) + .withAllowedClientIPRanges(ipRanges) + .build(); StandbyClientSync clientSync = new StandbyClientSync(host, serverPort.getPort(), storeC, false, getClientTimeout(), false, folder.newFolder()) ) { serverSync.start(); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverMultipleClientsTestIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverMultipleClientsTestIT.java (revision 1855035) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverMultipleClientsTestIT.java (working copy) @@ -63,7 +63,11 @@ NodeStore store = SegmentNodeStoreBuilders.builder(storeS).build(); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), storeS, MB); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(storeS) + .withBlobChunkSize(MB) + .build(); StandbyClientSync cl1 = new StandbyClientSync(getServerHost(), serverPort.getPort(), storeC, false, getClientTimeout(), false, folder.newFolder()); StandbyClientSync cl2 = new StandbyClientSync(getServerHost(), serverPort.getPort(), storeC2, false, getClientTimeout(), false, folder.newFolder()) ) { Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverSslTestIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverSslTestIT.java (revision 1855035) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/FailoverSslTestIT.java (working copy) @@ -58,7 +58,12 @@ FileStore storeS = serverFileStore.fileStore(); FileStore storeC = clientFileStore.fileStore(); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), storeS, MB, true); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(storeS) + .withBlobChunkSize(MB) + .withSecureConnection(true) + .build(); StandbyClientSync clientSync = new StandbyClientSync(getServerHost(), serverPort.getPort(), storeC, true, getClientTimeout(), false, folder.newFolder()); ) { assertTrue(synchronizeAndCompareHead(serverSync, clientSync)); @@ -70,7 +75,12 @@ FileStore storeS = serverFileStore.fileStore(); FileStore storeC = clientFileStore.fileStore(); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), storeS, MB, true); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(storeS) + .withBlobChunkSize(MB) + .withSecureConnection(true) + .build(); StandbyClientSync clientSync = new StandbyClientSync(getServerHost(), serverPort.getPort(), storeC, false, getClientTimeout(), false, folder.newFolder()); ) { assertFalse(synchronizeAndCompareHead(serverSync, clientSync)); @@ -82,7 +92,11 @@ FileStore storeS = serverFileStore.fileStore(); FileStore storeC = clientFileStore.fileStore(); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), storeS, MB); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(storeS) + .withBlobChunkSize(MB) + .build(); StandbyClientSync clientSync = new StandbyClientSync(getServerHost(), serverPort.getPort(), storeC, true, getClientTimeout(), false, folder.newFolder()); ) { assertFalse(synchronizeAndCompareHead(serverSync, clientSync)); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/MBeanIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/MBeanIT.java (revision 1855035) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/MBeanIT.java (working copy) @@ -61,7 +61,13 @@ public void testServerEmptyConfig() throws Exception { MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); ObjectName status = new ObjectName(StandbyStatusMBean.JMX_NAME + ",id=*"); - try (StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), serverFileStore.fileStore(), 1 * MB)) { + try ( + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(serverFileStore.fileStore()) + .withBlobChunkSize(1 * MB) + .build() + ) { serverSync.start(); Set instances = jmxServer.queryNames(status, null); @@ -153,7 +159,11 @@ MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); ObjectName clientStatus, serverStatus; try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), serverFileStore.fileStore(), MB); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(serverFileStore.fileStore()) + .withBlobChunkSize(MB) + .build(); StandbyClientSync clientSync = new StandbyClientSync(getServerHost(), serverPort.getPort(), clientFileStore.fileStore(), false, getClientTimeout(), false, folder.newFolder()) ) { serverSync.start(); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/RecoverTestIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/RecoverTestIT.java (revision 1855035) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/RecoverTestIT.java (working copy) @@ -62,7 +62,11 @@ addTestContent(store, "client"); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), storeS, MB); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(storeS) + .withBlobChunkSize(MB) + .build(); StandbyClientSync cl = new StandbyClientSync(getServerHost(), serverPort.getPort(), storeC, false, getClientTimeout(), false, folder.newFolder()) ) { serverSync.start(); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbySegmentBlobTestIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbySegmentBlobTestIT.java (revision 1855035) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbySegmentBlobTestIT.java (working copy) @@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import java.io.File; + import org.apache.commons.io.input.NullInputStream; import org.apache.jackrabbit.core.data.FileDataStore; import org.apache.jackrabbit.oak.api.Blob; @@ -44,8 +46,6 @@ import org.junit.rules.RuleChain; import org.junit.rules.TemporaryFolder; -import java.io.File; - public class StandbySegmentBlobTestIT extends TestBase { // `BLOB_SIZE` has to be chosen in such a way that is both: @@ -94,7 +94,11 @@ NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(primary) + .withBlobChunkSize(MB) + .build(); StandbyClientSync clientSync = new StandbyClientSync(getServerHost(), serverPort.getPort(), secondary, false, getClientTimeout(), false, folder.newFolder()) ) { serverSync.start(); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestIT.java (revision 1855035) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestIT.java (working copy) @@ -71,7 +71,11 @@ NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(primary) + .withBlobChunkSize(MB) + .build(); StandbyClientSync clientSync = new StandbyClientSync(getServerHost(), serverPort.getPort(), secondary, false, getClientTimeout(), false, folder.newFolder()) ) { serverSync.start(); @@ -111,7 +115,11 @@ NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); try ( - StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, MB); + StandbyServerSync serverSync = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(primary) + .withBlobChunkSize(MB) + .build(); StandbyClientSync clientSync = new StandbyClientSync(getServerHost(), serverPort.getPort(), secondary, false, getClientTimeout(), false, folder.newFolder()) ) { serverSync.start(); Index: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/SlowServerIT.java =================================================================== --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/SlowServerIT.java (nonexistent) +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/SlowServerIT.java (working copy) @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.standby.server; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.input.NullInputStream; +import org.apache.jackrabbit.oak.api.Blob; +import org.apache.jackrabbit.oak.api.CommitFailedException; +import org.apache.jackrabbit.oak.commons.junit.TemporaryPort; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.standby.client.StandbyClientSync; +import org.apache.jackrabbit.oak.segment.test.TemporaryBlobStore; +import org.apache.jackrabbit.oak.segment.test.TemporaryFileStore; +import org.apache.jackrabbit.oak.spi.commit.CommitInfo; +import org.apache.jackrabbit.oak.spi.commit.EmptyHook; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.junit.rules.TemporaryFolder; + +public class SlowServerIT { + + private TemporaryFolder folder = new TemporaryFolder(new File("target")); + + private TemporaryBlobStore serverBlobStore = new TemporaryBlobStore(folder); + + private TemporaryFileStore serverFileStore = new TemporaryFileStore(folder, serverBlobStore, false); + + private TemporaryBlobStore clientBlobStore = new TemporaryBlobStore(folder); + + private TemporaryFileStore clientFileStore = new TemporaryFileStore(folder, clientBlobStore, true); + + @Rule + public RuleChain chain = RuleChain.outerRule(folder) + .around(serverBlobStore) + .around(serverFileStore) + .around(clientBlobStore) + .around(clientFileStore); + + @Rule + public TemporaryPort serverPort = new TemporaryPort(); + + @Test + public void testSyncFailingDueToTooShortTimeout() throws Exception { + FileStore primary = serverFileStore.fileStore(); + FileStore secondary = clientFileStore.fileStore(); + + // Add a node on the primary that references a 5MB binary. + + createTestContent(primary); + + // Configure a StandbyBlobReader that behaves like the default one, but + // adds a 2s delay every time a binary is fetched from the Data Store. + + StandbyBlobReader blobReader = newDelayedBlobReader(2, TimeUnit.SECONDS, new DefaultStandbyBlobReader(primary.getBlobStore())); + + try ( + + // The primary uses the delayed StandbyBlobReader and is configured + // to transfer binaries in chunks of 1MB. + + StandbyServerSync server = StandbyServerSync.builder() + .withPort(serverPort.getPort()) + .withFileStore(primary) + .withBlobChunkSize(1024 * 1024) + .withStandbyBlobReader(blobReader) + .build(); + + // The client expects the server to respond withing 1s. When the + // binary is requested, the delay on the server guarantees that the + // timeout on the client will expire. + + StandbyClientSync client = new StandbyClientSync("localhost", serverPort.getPort(), secondary, false, 1000, false, folder.newFolder()) + ) { + server.start(); + client.run(); + assertNotEquals(primary.getHead(), secondary.getHead()); + assertEquals(1, client.getFailedRequests()); + } + } + + private void createTestContent(FileStore fileStore) throws CommitFailedException, IOException { + NodeStore store = SegmentNodeStoreBuilders.builder(fileStore).build(); + NodeBuilder builder = store.getRoot().builder(); + + Blob blob = store.createBlob(new NullInputStream(5 * 1024 * 102)); + builder.child("n").setProperty("data", blob); + + store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY); + + fileStore.flush(); + } + + private StandbyBlobReader newDelayedBlobReader(int delay, TimeUnit timeUnit, StandbyBlobReader wrapped) { + return new StandbyBlobReader() { + + @Override + public InputStream readBlob(String blobId) { + try { + Thread.sleep(timeUnit.toMillis(delay)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return wrapped.readBlob(blobId); + } + + @Override + public long getBlobLength(String blobId) { + return wrapped.getBlobLength(blobId); + } + + }; + } + +} Property changes on: oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/SlowServerIT.java ___________________________________________________________________ Added: svn:eol-style ## -0,0 +1 ## +native \ No newline at end of property