diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java index 508be4e572..30c570476a 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java @@ -162,7 +162,7 @@ class StandbyClient implements AutoCloseable { @Nullable String getHead() throws InterruptedException { - channel.writeAndFlush(new GetHeadRequest(clientId)); + channel.writeAndFlush(new GetHeadRequest(clientId, readTimeoutMs)); GetHeadResponse response = headQueue.poll(readTimeoutMs, TimeUnit.MILLISECONDS); diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java index dc562d2b32..1a1719c28b 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java @@ -64,7 +64,8 @@ class StandbyClientSyncExecution { RecordId remoteHead = getHead(); if (remoteHead == null) { - throw new IllegalStateException("Unable to fetch remote head"); + log.error("Unable to fetch remote head"); + return; } if (remoteHead.equals(store.getHead().getRecordId())) { diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetHeadRequest.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetHeadRequest.java index 24b41ac2aa..c60fea4432 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetHeadRequest.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetHeadRequest.java @@ -20,13 +20,18 @@ package org.apache.jackrabbit.oak.segment.standby.codec; public class GetHeadRequest { private final String clientId; + private final long timeout; - public GetHeadRequest(String clientId) { + public GetHeadRequest(String clientId, long timeout) { this.clientId = clientId; + this.timeout = timeout; } public String getClientId() { return clientId; } + public long getTimeout() { + return timeout; + } } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetHeadRequestEncoder.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetHeadRequestEncoder.java index adce433ba9..eb67976afe 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetHeadRequestEncoder.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetHeadRequestEncoder.java @@ -31,7 +31,7 @@ public class GetHeadRequestEncoder extends MessageToMessageEncoder out) throws Exception { log.debug("Sending request from client {} for current head", msg.getClientId()); - out.add(Messages.newGetHeadRequest(msg.getClientId())); + out.add(Messages.newGetHeadRequest(msg.getClientId(), msg.getTimeout())); } } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java index af9f09a209..45c83e0de1 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/Messages.java @@ -29,7 +29,7 @@ final class Messages { static final byte HEADER_REFERENCES = 0x03; - static final String GET_HEAD = "h"; + static final String GET_HEAD = "h."; static final String GET_SEGMENT = "s."; @@ -60,12 +60,12 @@ final class Messages { return builder.toString(); } - static String newGetHeadRequest(String clientId, boolean delimited) { - return newRequest(clientId, GET_HEAD, delimited); + static String newGetHeadRequest(String clientId, long timeout, boolean delimited) { + return newRequest(clientId, GET_HEAD + timeout, delimited); } - static String newGetHeadRequest(String clientId) { - return newGetHeadRequest(clientId, true); + static String newGetHeadRequest(String clientId, long timeout) { + return newGetHeadRequest(clientId, timeout, true); } static String newGetSegmentRequest(String clientId, String segmentId, boolean delimited) { diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java index 8f0906c307..61cc12b2d1 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoder.java @@ -38,9 +38,17 @@ public class RequestDecoder extends MessageToMessageDecoder { } else if (request.startsWith(Messages.GET_BLOB)) { log.debug("Parsed 'get blob' request"); out.add(new GetBlobRequest(Messages.extractClientFrom(msg), request.substring(Messages.GET_BLOB.length()))); - } else if (request.equalsIgnoreCase(Messages.GET_HEAD)) { + } else if (request.startsWith(Messages.GET_HEAD)) { log.debug("Parsed 'get head' message"); - out.add(new GetHeadRequest(Messages.extractClientFrom(msg))); + String timeoutStr = request.substring(Messages.GET_HEAD.length()); + long timeout = 0L; + try { + timeout = Long.parseLong(timeoutStr); + } catch (NumberFormatException e) { + log.debug("Found unrecognizable timeout {} in 'get head' message. Will move forward with timeout=0ms.", + timeoutStr); + } + out.add(new GetHeadRequest(Messages.extractClientFrom(msg), timeout)); } else if (request.startsWith(Messages.GET_SEGMENT)) { log.debug("Parsed 'get segment' message"); out.add(new GetSegmentRequest(Messages.extractClientFrom(msg), request.substring(Messages.GET_SEGMENT.length()))); diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java index a75787f5cd..f6c01e2226 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java @@ -17,6 +17,9 @@ package org.apache.jackrabbit.oak.segment.standby.server; +import static org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readPersistedHeadWithRetry; + +import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.file.FileStore; class DefaultStandbyHeadReader implements StandbyHeadReader { @@ -28,8 +31,9 @@ class DefaultStandbyHeadReader implements StandbyHeadReader { } @Override - public String readHeadRecordId() { - return store.getRevisions().getPersistedHead().toString(); + public String readHeadRecordId(long timeout) { + RecordId persistedHead = readPersistedHeadWithRetry(store, timeout); + return persistedHead != null ? persistedHead.toString() : null; } } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java index 61dc9c94d7..74bfd5fbb7 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java @@ -18,12 +18,12 @@ package org.apache.jackrabbit.oak.segment.standby.server; import static com.google.common.collect.Lists.newArrayList; -import static org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readSegmentWithRetry; import java.util.List; import java.util.UUID; import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.file.FileStore; class DefaultStandbyReferencesReader implements StandbyReferencesReader { @@ -40,20 +40,21 @@ class DefaultStandbyReferencesReader implements StandbyReferencesReader { long msb = uuid.getMostSignificantBits(); long lsb = uuid.getLeastSignificantBits(); + SegmentId segmentId = store.getSegmentIdProvider().newSegmentId(msb, lsb); - Segment segment = readSegmentWithRetry(store, store.getSegmentIdProvider().newSegmentId(msb, lsb)); + if (store.containsSegment(segmentId)) { + Segment segment = store.readSegment(segmentId); - if (segment == null) { - return null; - } + List references = newArrayList(); - List references = newArrayList(); + for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) { + references.add(segment.getReferencedSegmentId(i).toString()); + } - for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) { - references.add(segment.getReferencedSegmentId(i).toString()); + return references; } - return references; + return null; } } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java index c680f3b38e..ab50b27fc5 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java @@ -17,13 +17,12 @@ package org.apache.jackrabbit.oak.segment.standby.server; -import static org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readSegmentWithRetry; - import java.io.IOException; import java.util.UUID; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,24 +38,24 @@ class DefaultStandbySegmentReader implements StandbySegmentReader { } @Override - public byte[] readSegment(String segmentId) { - UUID uuid = UUID.fromString(segmentId); + public byte[] readSegment(String id) { + UUID uuid = UUID.fromString(id); long msb = uuid.getMostSignificantBits(); long lsb = uuid.getLeastSignificantBits(); - - Segment segment = readSegmentWithRetry(store, store.getSegmentIdProvider().newSegmentId(msb, lsb)); - - if (segment == null) { - return null; - } - - try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { - segment.writeTo(stream); - return stream.toByteArray(); - } catch (IOException e) { - log.warn("Error while reading segment content", e); - return null; + SegmentId segmentId = store.getSegmentIdProvider().newSegmentId(msb, lsb); + + if (store.containsSegment(segmentId)) { + Segment segment = store.readSegment(segmentId); + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + segment.writeTo(stream); + return stream.toByteArray(); + } catch (IOException e) { + log.warn("Error while reading segment content", e); + return null; + } } + + return null; } } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java index 686a06e563..cd0dcc6240 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java @@ -19,8 +19,8 @@ package org.apache.jackrabbit.oak.segment.standby.server; import java.util.concurrent.TimeUnit; -import org.apache.jackrabbit.oak.segment.Segment; -import org.apache.jackrabbit.oak.segment.SegmentId; +import com.google.common.base.Supplier; +import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; public final class FileStoreUtil { private static final Logger log = LoggerFactory.getLogger(FileStoreUtil.class); + private static final long DEFAULT_SLEEP_TIME = 125L; private FileStoreUtil() { // Prevent instantiation @@ -37,14 +38,27 @@ public final class FileStoreUtil { return (int) Math.ceil((double) x / (double) y); } - static Segment readSegmentWithRetry(FileStore store, SegmentId id) { - for (int i = 0; i < 160; i++) { - if (store.containsSegment(id)) { - return store.readSegment(id); + static RecordId readPersistedHeadWithRetry(FileStore store, long timeout) { + Supplier headSupplier = () -> { + return store.getRevisions().getPersistedHead(); + }; + + if (timeout > DEFAULT_SLEEP_TIME) { + return readWithRetry(headSupplier, "persisted head", timeout); + } else { + return headSupplier.get(); + } + } + + static T readWithRetry(Supplier supplier, String supplied, long timeout) { + for (int i = 0; i < timeout / DEFAULT_SLEEP_TIME; i++) { + if (supplier.get() != null) { + return supplier.get(); } + try { - log.trace("Unable to read segment, waiting..."); - TimeUnit.MILLISECONDS.sleep(125); + log.trace("Unable to read {}, waiting...", supplied); + TimeUnit.MILLISECONDS.sleep(DEFAULT_SLEEP_TIME); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; @@ -52,5 +66,4 @@ public final class FileStoreUtil { } return null; } - } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java index ddd70f7834..f605008bd5 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandler.java @@ -42,7 +42,7 @@ class GetHeadRequestHandler extends SimpleChannelInboundHandler protected void channelRead0(ChannelHandlerContext ctx, GetHeadRequest msg) throws Exception { log.debug("Reading head for client {}", msg.getClientId()); - String id = reader.readHeadRecordId(); + String id = reader.readHeadRecordId(msg.getTimeout()); if (id == null) { log.debug("Head not found, discarding request from client {}", msg.getClientId()); diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java index 46e01cbe79..22ee2d03b8 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyHeadReader.java @@ -23,11 +23,13 @@ package org.apache.jackrabbit.oak.segment.standby.server; interface StandbyHeadReader { /** - * Read the head record ID. + * Tries to read the persisted head record ID in the specified time frame. + * @param timeout + * the length of time in which to (re)try reading persisted head * - * @return the head record ID or {@code null} if the head record ID can't be - * found. + * @return the head record ID or {@code null} if the persisted head record ID can't be + * timely found. */ - String readHeadRecordId(); + String readHeadRecordId(long timeout); } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java index 6a74e02d49..4d8464f8de 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java @@ -22,9 +22,9 @@ package org.apache.jackrabbit.oak.segment.standby; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeFalse; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -37,7 +37,6 @@ import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.api.PropertyState; import org.apache.jackrabbit.oak.api.Type; -import org.apache.jackrabbit.oak.commons.CIHelper; import org.apache.jackrabbit.oak.commons.junit.TemporaryPort; import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; import org.apache.jackrabbit.oak.segment.file.FileStore; @@ -125,6 +124,58 @@ public abstract class DataStoreTestBase extends TestBase { public void after() { proxy.close(); } + + @Test + public void testResilientSync() throws Exception { + final int blobSize = 5 * MB; + FileStore primary = getPrimary(); + FileStore secondary = getSecondary(); + + NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); + byte[] data = addTestContent(store, "server", blobSize); + + // run 1: unsuccessful + try ( + StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB); + StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000) + ) { + serverSync.start(); + // no persisted head on primary + // sync shouldn't be successful, but shouldn't throw exception either, + // timeout too low for TarMK flush thread to kick-in + cl.run(); + assertNotEquals(primary.getHead(), secondary.getHead()); + } + + // run 2: successful + try ( + StandbyServerSync serverSync = new StandbyServerSync(serverPort.getPort(), primary, 1 * MB); + StandbyClientSync cl = newStandbyClientSync(secondary, serverPort.getPort(), 4_000) + ) { + serverSync.start(); + // this time persisted head will be available on primary + // waited at least 4s + 4s > 5s (TarMK flush thread run frequency) + cl.run(); + assertEquals(primary.getHead(), secondary.getHead()); + } + + assertTrue(primary.getStats().getApproximateSize() < MB); + assertTrue(secondary.getStats().getApproximateSize() < MB); + + PropertyState ps = secondary.getHead().getChildNode("root") + .getChildNode("server").getProperty("testBlob"); + assertNotNull(ps); + assertEquals(Type.BINARY.tag(), ps.getType().tag()); + Blob b = ps.getValue(Type.BINARY); + assertEquals(blobSize, b.length()); + byte[] testData = new byte[blobSize]; + try ( + InputStream blobInputStream = b.getNewStream() + ) { + ByteStreams.readFully(blobInputStream, testData); + assertArrayEquals(data, testData); + } + } @Test public void testSync() throws Exception { @@ -167,8 +218,6 @@ public abstract class DataStoreTestBase extends TestBase { */ @Test public void testSyncBigBlob() throws Exception { - assumeFalse(CIHelper.jenkins()); // FIXME OAK-6678: fails on Jenkins - final long blobSize = (long) (1 * GB); final int seed = 13; diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetHeadRequestEncoderTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetHeadRequestEncoderTest.java index 8cf395000f..6fc96ae46b 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetHeadRequestEncoderTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetHeadRequestEncoderTest.java @@ -28,9 +28,9 @@ public class GetHeadRequestEncoderTest { @Test public void encodeRequest() throws Exception { EmbeddedChannel channel = new EmbeddedChannel(new GetHeadRequestEncoder()); - channel.writeOutbound(new GetHeadRequest("clientId")); + channel.writeOutbound(new GetHeadRequest("clientId", 0L)); String message = (String) channel.readOutbound(); - assertEquals(newGetHeadRequest("clientId"), message); + assertEquals(newGetHeadRequest("clientId", 0L), message); } } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java index 466306962e..4f1d8e0250 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/RequestDecoderTest.java @@ -37,7 +37,7 @@ public class RequestDecoderTest { @Test public void shouldDecodeValidGetHeadRequests() throws Exception { EmbeddedChannel channel = new EmbeddedChannel(new RequestDecoder()); - channel.writeInbound(Messages.newGetHeadRequest("clientId", false)); + channel.writeInbound(Messages.newGetHeadRequest("clientId", 0L, false)); GetHeadRequest request = (GetHeadRequest) channel.readInbound(); assertEquals("clientId", request.getClientId()); } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java index 954a1e2c8e..27f9e099f9 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java @@ -19,8 +19,10 @@ package org.apache.jackrabbit.oak.segment.standby.server; import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import java.io.File; +import java.util.concurrent.TimeUnit; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.junit.Rule; @@ -41,7 +43,16 @@ public class DefaultStandbyHeadReaderTest { try (FileStore store = newFileStore()) { store.flush(); DefaultStandbyHeadReader reader = new DefaultStandbyHeadReader(store); - assertEquals(store.getRevisions().getPersistedHead().toString(), reader.readHeadRecordId()); + assertEquals(store.getRevisions().getPersistedHead().toString(), reader.readHeadRecordId(0L)); + } + } + + @Test + public void shouldWaitForFlushAndReturnHeadSegmentId() throws Exception { + try (FileStore store = newFileStore()) { + DefaultStandbyHeadReader reader = new DefaultStandbyHeadReader(store); + assertNotNull(reader.readHeadRecordId(TimeUnit.MILLISECONDS.toMillis(10_000L))); + assertEquals(store.getRevisions().getPersistedHead().toString(), reader.readHeadRecordId(0L)); } } diff --git oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java index b999ef55a4..23d832a2c5 100644 --- oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java +++ oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetHeadRequestHandlerTest.java @@ -32,10 +32,10 @@ public class GetHeadRequestHandlerTest { @Test public void successfulReadsShouldGenerateResponses() throws Exception { StandbyHeadReader reader = mock(StandbyHeadReader.class); - when(reader.readHeadRecordId()).thenReturn("recordId"); + when(reader.readHeadRecordId(0L)).thenReturn("recordId"); EmbeddedChannel channel = new EmbeddedChannel(new GetHeadRequestHandler(reader)); - channel.writeInbound(new GetHeadRequest("clientId")); + channel.writeInbound(new GetHeadRequest("clientId", 0L)); GetHeadResponse response = (GetHeadResponse) channel.readOutbound(); assertEquals("recordId", response.getHeadRecordId()); assertEquals("clientId", response.getClientId()); @@ -44,10 +44,10 @@ public class GetHeadRequestHandlerTest { @Test public void unsuccessfulReadsShouldBeDiscarded() throws Exception { StandbyHeadReader reader = mock(StandbyHeadReader.class); - when(reader.readHeadRecordId()).thenReturn(null); + when(reader.readHeadRecordId(0L)).thenReturn(null); EmbeddedChannel channel = new EmbeddedChannel(new GetHeadRequestHandler(reader)); - channel.writeInbound(new GetHeadRequest("clientId")); + channel.writeInbound(new GetHeadRequest("clientId", 0L)); assertNull(channel.readOutbound()); }