diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java index dc562d2b32..1a1719c28b 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java @@ -64,7 +64,8 @@ class StandbyClientSyncExecution { RecordId remoteHead = getHead(); if (remoteHead == null) { - throw new IllegalStateException("Unable to fetch remote head"); + log.error("Unable to fetch remote head"); + return; } if (remoteHead.equals(store.getHead().getRecordId())) { diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java index a75787f5cd..f8da932149 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java @@ -17,6 +17,9 @@ package org.apache.jackrabbit.oak.segment.standby.server; +import static org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readPersistedHeadWithRetry; + +import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.file.FileStore; class DefaultStandbyHeadReader implements StandbyHeadReader { @@ -29,7 +32,8 @@ class DefaultStandbyHeadReader implements StandbyHeadReader { @Override public String readHeadRecordId() { - return store.getRevisions().getPersistedHead().toString(); + RecordId persistedHead = readPersistedHeadWithRetry(store); + return persistedHead != null ? persistedHead.toString() : null; } } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java index 686a06e563..1ddcc5f9a6 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java @@ -19,6 +19,8 @@ package org.apache.jackrabbit.oak.segment.standby.server; import java.util.concurrent.TimeUnit; +import com.google.common.base.Supplier; +import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.Segment; import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.file.FileStore; @@ -38,12 +40,33 @@ public final class FileStoreUtil { } static Segment readSegmentWithRetry(FileStore store, SegmentId id) { - for (int i = 0; i < 160; i++) { + Supplier segmentSupplier = () -> { if (store.containsSegment(id)) { return store.readSegment(id); } + + return null; + }; + + return readWithRetry(segmentSupplier, "segment"); + } + + static RecordId readPersistedHeadWithRetry(FileStore store) { + Supplier headSupplier = () -> { + return store.getRevisions().getPersistedHead(); + }; + + return readWithRetry(headSupplier, "persisted head"); + } + + static T readWithRetry(Supplier supplier, String supplied) { + for (int i = 0; i < 160; i++) { + if (supplier.get() != null) { + return supplier.get(); + } + try { - log.trace("Unable to read segment, waiting..."); + log.trace("Unable to read {}, waiting...", supplied); TimeUnit.MILLISECONDS.sleep(125); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -52,5 +75,4 @@ public final class FileStoreUtil { } return null; } - } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java index 6a74e02d49..4d8464f8de 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java @@ -22,9 +22,9 @@ package org.apache.jackrabbit.oak.segment.standby; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeFalse; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -37,7 +37,6 @@ import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; -import org.apache.jackrabbit.oak.commons.CIHelper; import org.apache.jackrabbit.oak.commons.junit.TemporaryPort; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; import org.apache.jackrabbit.oak.segment.file.FileStore; @@ -125,6 +124,58 @@ public abstract class DataStoreTestBase extends TestBase { public void after() { proxy.close(); } + + @Test + public void testResilientSync() throws Exception { + final int blobSize = 5 * MB; + FileStore primary = getPrimary(); + FileStore secondary = getSecondary(); + + NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); + byte[] data = addTestContent(store, "server", blobSize); + + // run 1: unsuccessful + try ( + StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB); + StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000) + ) { + serverSync.start(); + // no persisted head on primary + // sync shouldn't be successful, but shouldn't throw exception either, + // timeout too low for TarMK flush thread to kick-in + cl.run(); + assertNotEquals(primary.getHead(), secondary.getHead()); + } + + // run 2: successful + try ( + StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB); + StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000) + ) { + serverSync.start(); + // this time persisted head will be available on primary + // waited at least 4s + 4s > 5s (TarMK flush thread run frequency) + cl.run(); + assertEquals(primary.getHead(), secondary.getHead()); + } + + assertTrue(primary.getStats().getApproximateSize() < MB); + assertTrue(secondary.getStats().getApproximateSize() < MB); + + PropertyState ps = secondary.getHead().getChildNode("root") + .getChildNode("server").getProperty("testBlob"); + assertNotNull(ps); + assertEquals(Type.BINARY.tag(), ps.getType().tag()); + Blob b = ps.getValue(Type.BINARY); + assertEquals(blobSize, b.length()); + byte[] testData = new byte[blobSize]; + try ( + InputStream blobInputStream = b.getNewStream() + ) { + ByteStreams.readFully(blobInputStream, testData); + assertArrayEquals(data, testData); + } + } @Test public void testSync() throws Exception { @@ -167,8 +218,6 @@ public abstract class DataStoreTestBase extends TestBase { */ @Test public void testSyncBigBlob() throws Exception { - assumeFalse(CIHelper.jenkins()); // FIXME OAK-6678: fails on Jenkins - final long blobSize = (long) (1 * GB); final int seed = 13; diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java index 954a1e2c8e..902055f557 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java @@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.segment.standby.server; import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import java.io.File; @@ -44,5 +45,14 @@ public class DefaultStandbyHeadReaderTest { assertEquals(store.getRevisions().getPersistedHead().toString(), reader.readHeadRecordId()); } } + + @Test + public void shouldWaitForFlushAndReturnHeadSegmentId() throws Exception { + try (FileStore store = newFileStore()) { + DefaultStandbyHeadReader reader = new DefaultStandbyHeadReader(store); + assertNotNull(reader.readHeadRecordId()); + assertEquals(store.getRevisions().getPersistedHead().toString(), reader.readHeadRecordId()); + } + } }