diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java index dc562d2b32..1a1719c28b 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java @@ -64,7 +64,8 @@ class StandbyClientSyncExecution { RecordId remoteHead = getHead(); if (remoteHead == null) { - throw new IllegalStateException("Unable to fetch remote head"); + log.error("Unable to fetch remote head"); + return; } if (remoteHead.equals(store.getHead().getRecordId())) { diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java index a75787f5cd..fd5d3c6ff3 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java @@ -17,19 +17,25 @@ package org.apache.jackrabbit.oak.segment.standby.server; +import static org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readPersistedHeadWithRetry; + +import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.file.FileStore; class DefaultStandbyHeadReader implements StandbyHeadReader { private final FileStore store; + private final long timeout; - DefaultStandbyHeadReader(FileStore store) { + DefaultStandbyHeadReader(FileStore store, long timeout) { this.store = store; + this.timeout = timeout; } @Override public String readHeadRecordId() { - return store.getRevisions().getPersistedHead().toString(); + RecordId persistedHead = readPersistedHeadWithRetry(store, timeout); + return persistedHead != null ? persistedHead.toString() : null; } } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java index 61dc9c94d7..74bfd5fbb7 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java @@ -18,12 +18,12 @@ package org.apache.jackrabbit.oak.segment.standby.server; import static com.google.common.collect.Lists.newArrayList; -import static org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readSegmentWithRetry; import java.util.List; import java.util.UUID; import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.file.FileStore; class DefaultStandbyReferencesReader implements StandbyReferencesReader { @@ -40,20 +40,21 @@ class DefaultStandbyReferencesReader implements StandbyReferencesReader { long msb = uuid.getMostSignificantBits(); long lsb = uuid.getLeastSignificantBits(); + SegmentId segmentId = store.getSegmentIdProvider().newSegmentId(msb, lsb); - Segment segment = readSegmentWithRetry(store, store.getSegmentIdProvider().newSegmentId(msb, lsb)); + if (store.containsSegment(segmentId)) { + Segment segment = store.readSegment(segmentId); - if (segment == null) { - return null; - } + List references = newArrayList(); - List references = newArrayList(); + for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) { + references.add(segment.getReferencedSegmentId(i).toString()); + } - for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) { - references.add(segment.getReferencedSegmentId(i).toString()); + return references; } - return references; + return null; } } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java index c680f3b38e..ab50b27fc5 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java @@ -17,13 +17,12 @@ package org.apache.jackrabbit.oak.segment.standby.server; -import static org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readSegmentWithRetry; - import java.io.IOException; import java.util.UUID; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,24 +38,24 @@ class DefaultStandbySegmentReader implements StandbySegmentReader { } @Override - public byte[] readSegment(String segmentId) { - UUID uuid = UUID.fromString(segmentId); + public byte[] readSegment(String id) { + UUID uuid = UUID.fromString(id); long msb = uuid.getMostSignificantBits(); long lsb = uuid.getLeastSignificantBits(); - - Segment segment = readSegmentWithRetry(store, store.getSegmentIdProvider().newSegmentId(msb, lsb)); - - if (segment == null) { - return null; - } - - try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { - segment.writeTo(stream); - return stream.toByteArray(); - } catch (IOException e) { - log.warn("Error while reading segment content", e); - return null; + SegmentId segmentId = store.getSegmentIdProvider().newSegmentId(msb, lsb); + + if (store.containsSegment(segmentId)) { + Segment segment = store.readSegment(segmentId); + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + segment.writeTo(stream); + return stream.toByteArray(); + } catch (IOException e) { + log.warn("Error while reading segment content", e); + return null; + } } + + return null; } } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java index 686a06e563..cd0dcc6240 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java @@ -19,8 +19,8 @@ package org.apache.jackrabbit.oak.segment.standby.server; import java.util.concurrent.TimeUnit; -import org.apache.jackrabbit.oak.segment.Segment; -import org.apache.jackrabbit.oak.segment.SegmentId; +import com.google.common.base.Supplier; +import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; public final class FileStoreUtil { private static final Logger log = LoggerFactory.getLogger(FileStoreUtil.class); + private static final long DEFAULT_SLEEP_TIME = 125L; private FileStoreUtil() { // Prevent instantiation @@ -37,14 +38,27 @@ public final class FileStoreUtil { return (int) Math.ceil((double) x / (double) y); } - static Segment readSegmentWithRetry(FileStore store, SegmentId id) { - for (int i = 0; i < 160; i++) { - if (store.containsSegment(id)) { - return store.readSegment(id); + static RecordId readPersistedHeadWithRetry(FileStore store, long timeout) { + Supplier headSupplier = () -> { + return store.getRevisions().getPersistedHead(); + }; + + if (timeout > DEFAULT_SLEEP_TIME) { + return readWithRetry(headSupplier, "persisted head", timeout); + } else { + return headSupplier.get(); + } + } + + static T readWithRetry(Supplier supplier, String supplied, long timeout) { + for (int i = 0; i < timeout / DEFAULT_SLEEP_TIME; i++) { + if (supplier.get() != null) { + return supplier.get(); } + try { - log.trace("Unable to read segment, waiting..."); - TimeUnit.MILLISECONDS.sleep(125); + log.trace("Unable to read {}, waiting...", supplied); + TimeUnit.MILLISECONDS.sleep(DEFAULT_SLEEP_TIME); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; @@ -52,5 +66,4 @@ public final class FileStoreUtil { } return null; } - } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java index e1cca9472e..5e3bd50c36 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java @@ -55,6 +55,13 @@ import org.slf4j.LoggerFactory; class StandbyServer implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(StandbyServer.class); + + /** + * If a persisted head state cannot be acquired in less than this timeout, + * the 'get head' request from the client will be discarded. + */ + static final long READ_HEAD_TIMEOUT = + Long.getLong("standby.server.timeout", 10_000L); static Builder builder(int port, StoreProvider provider, int blobChunkSize) { return new Builder(port, provider, blobChunkSize); @@ -183,7 +190,7 @@ class StandbyServer implements AutoCloseable { FileStore store = builder.storeProvider.provideStore(); - p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(store))); + p.addLast(new GetHeadRequestHandler(new DefaultStandbyHeadReader(store, READ_HEAD_TIMEOUT))); p.addLast(new GetSegmentRequestHandler(new DefaultStandbySegmentReader(store))); p.addLast(new GetBlobRequestHandler(new DefaultStandbyBlobReader(store.getBlobStore()))); p.addLast(new GetReferencesRequestHandler(new DefaultStandbyReferencesReader(store))); diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java index 6a74e02d49..a21e1df527 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java @@ -22,9 +22,9 @@ package org.apache.jackrabbit.oak.segment.standby; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeFalse; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -37,7 +37,6 @@ import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; -import org.apache.jackrabbit.oak.commons.CIHelper; import org.apache.jackrabbit.oak.commons.junit.TemporaryPort; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; import org.apache.jackrabbit.oak.segment.file.FileStore; @@ -125,6 +124,58 @@ public abstract class DataStoreTestBase extends TestBase { public void after() { proxy.close(); } + + @Test + public void testResilientSync() throws Exception { + final int blobSize = 5 * MB; + FileStore primary = getPrimary(); + FileStore secondary = getSecondary(); + + NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); + byte[] data = addTestContent(store, "server", blobSize); + + // run 1: unsuccessful + try ( + StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB); + StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000) + ) { + serverSync.start(); + // no persisted head on primary + // sync shouldn't be successful, but shouldn't throw exception either, + // timeout too low for TarMK flush thread to kick-in + cl.run(); + assertNotEquals(primary.getHead(), secondary.getHead()); + } + + // run 2: successful + try ( + StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB); + StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000) + ) { + serverSync.start(); + // this time persisted head will be available on primary + // waited at least 4s + 4s > 5s (TarMK flush thread run frequency) + cl.run(); + assertEquals(primary.getHead(), secondary.getHead()); + } + + assertTrue(primary.getStats().getApproximateSize() < MB); + assertTrue(secondary.getStats().getApproximateSize() < MB); + + PropertyState ps = secondary.getHead().getChildNode("root") + .getChildNode("server").getProperty("testBlob"); + assertNotNull(ps); + assertEquals(Type.BINARY.tag(), ps.getType().tag()); + Blob b = ps.getValue(Type.BINARY); + assertEquals(blobSize, b.length()); + byte[] testData = new byte[blobSize]; + try ( + InputStream blobInputStream = b.getNewStream() + ) { + ByteStreams.readFully(blobInputStream, testData); + assertArrayEquals(data, testData); + } + } @Test public void testSync() throws Exception { @@ -167,8 +218,6 @@ public abstract class DataStoreTestBase extends TestBase { */ @Test public void testSyncBigBlob() throws Exception { - assumeFalse(CIHelper.jenkins()); // FIXME OAK-6678: fails on Jenkins - final long blobSize = (long) (1 * GB); final int seed = 13; diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java index 998b4b72d8..940d0ee37a 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java @@ -29,7 +29,7 @@ import org.junit.BeforeClass; public class TestBase { static final int MB = 1024 * 1024; - private static final int timeout = Integer.getInteger("standby.test.timeout", 500); + private static final int timeout = Integer.getInteger("standby.test.timeout", 5000); // Java 6 on Windows doesn't support dual IP stacks, so we will skip our // IPv6 tests. diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java index 954a1e2c8e..4f578efd2e 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java @@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.segment.standby.server; import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import java.io.File; @@ -40,7 +41,16 @@ public class DefaultStandbyHeadReaderTest { public void shouldReturnHeadSegmentId() throws Exception { try (FileStore store = newFileStore()) { store.flush(); - DefaultStandbyHeadReader reader = new DefaultStandbyHeadReader(store); + DefaultStandbyHeadReader reader = new DefaultStandbyHeadReader(store, 0L); + assertEquals(store.getRevisions().getPersistedHead().toString(), reader.readHeadRecordId()); + } + } + + @Test + public void shouldWaitForFlushAndReturnHeadSegmentId() throws Exception { + try (FileStore store = newFileStore()) { + DefaultStandbyHeadReader reader = new DefaultStandbyHeadReader(store, 10_000L); + assertNotNull(reader.readHeadRecordId()); assertEquals(store.getRevisions().getPersistedHead().toString(), reader.readHeadRecordId()); } }