diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java index f9d9e2bcab..508be4e572 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java @@ -212,4 +212,8 @@ class StandbyClient implements AutoCloseable { return response.getReferences(); } + public int getReadTimeoutMs() { + return readTimeoutMs; + } + } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java index 55da9fac60..3b41318f7a 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java @@ -103,7 +103,7 @@ public final class StandbyClientSync implements ClientStandbyStatusMBean, Runnab try { jmxServer.registerMBean(new StandardMBean(this, ClientStandbyStatusMBean.class), new ObjectName(this.getMBeanName())); } catch (Exception e) { - log.error("can register standby status mbean", e); + log.error("cannot register standby status mbean", e); } } @@ -119,7 +119,7 @@ public final class StandbyClientSync implements ClientStandbyStatusMBean, Runnab try { jmxServer.unregisterMBean(new ObjectName(this.getMBeanName())); } catch (Exception e) { - log.error("can unregister standby status mbean", e); + log.error("cannot unregister standby status mbean", e); } closeGroup(); observer.unregister(); diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java index 26afd4ec46..dc562d2b32 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java @@ -78,7 +78,7 @@ class StandbyClientSyncExecution { compareAgainstBaseState(current, before, builder); boolean ok = store.getRevisions().setHead(before.getRecordId(), remoteHead); store.flush(); - log.debug("updated head state successfully: {} in {}ms.", ok, System.currentTimeMillis() - t); + log.info("updated head state successfully: {} in {}ms.", ok, System.currentTimeMillis() - t); } @Nullable diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java index afcc729a2d..6aa9e5b332 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java @@ -204,7 +204,8 @@ class StandbyDiff implements NodeStateDiff { InputStream in = client.getBlob(blobId); if (in == null) { - throw new IllegalStateException("Unable to load remote blob " + blobId + " at " + path + "#" + pName); + throw new IllegalStateException("Unable to load remote blob " + blobId + " at " + path + "#" + pName + + " in " + client.getReadTimeoutMs() + "ms. Please increase the timeout and try again."); } try { diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java index fd785311b9..4f6a57cd83 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java @@ -17,6 +17,8 @@ package org.apache.jackrabbit.oak.segment.standby.codec; +import static org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.roundDiv; + import java.io.InputStream; import java.io.PushbackInputStream; @@ -123,8 +125,8 @@ public class ChunkedBlobStream implements ChunkedInput { decorated = decorateRawBuffer(allocator, buffer); offset += written; - log.info("Sending chunk {}/{} of size {} to client {}", roundDiv(offset, chunkSize), - roundDiv(length, chunkSize), written, clientId); + log.debug("Sending chunk {}/{} of size {} from blob {} to client {}", roundDiv(offset, chunkSize), + roundDiv(length, chunkSize), written, blobId, clientId); release = false; return decorated; @@ -159,10 +161,6 @@ public class ChunkedBlobStream implements ChunkedInput { return out; } - private static int roundDiv(long x, int y) { - return (int) Math.ceil((double) x / (double) y); - } - private byte createMask(int bytesRead) { byte mask = 0; if (offset == 0) { diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java index 877fdabe19..7ff0a53462 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java @@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.segment.standby.codec; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; +import static org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.roundDiv; import java.io.File; import java.io.FileInputStream; @@ -70,6 +71,8 @@ public class ResponseDecoder extends ByteToMessageDecoder { } } + private int blobChunkSize; + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { int length = in.readInt(); @@ -122,7 +125,7 @@ public class ResponseDecoder extends ByteToMessageDecoder { out.add(new GetSegmentResponse(null, segmentId, data)); } - private static void decodeGetBlobResponse(int length, ByteBuf in, List out) throws IOException { + private void decodeGetBlobResponse(int length, ByteBuf in, List out) throws IOException { byte mask = in.readByte(); long blobLength = in.readLong(); @@ -134,6 +137,8 @@ public class ResponseDecoder extends ByteToMessageDecoder { // START_CHUNK flag enabled if ((mask & (1 << 0)) != 0) { + blobChunkSize = in.readableBytes() - 8; + if (tempFile.exists()) { log.debug("Detected previous incomplete transfer for {}. Cleaning up...", blobId); Files.delete(tempFile.toPath()); @@ -142,7 +147,8 @@ public class ResponseDecoder extends ByteToMessageDecoder { long hash = in.readLong(); - log.debug("Received chunk of size {} from blob {} ", in.readableBytes(), blobId); + log.debug("Received chunk {}/{} of size {} from blob {}", roundDiv(tempFile.length() + in.readableBytes(), blobChunkSize), + roundDiv(blobLength, blobChunkSize), in.readableBytes(), blobId); byte[] chunkData = new byte[in.readableBytes()]; in.readBytes(chunkData); @@ -164,7 +170,8 @@ public class ResponseDecoder extends ByteToMessageDecoder { FileInputStream fis = new DeleteOnCloseFileInputStream(tempFile); out.add(new GetBlobResponse(null, blobId, fis, fis.getChannel().size())); } else { - log.debug("Size mismatch for blob {}", blobId); + log.debug("Blob {} discarded due to size mismatch. Expected size: {}, actual size: {} ", blobId, + blobLength, tempFile.length()); } } } diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java index a9409e0a1b..61dc9c94d7 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java @@ -25,13 +25,9 @@ import java.util.UUID; import org.apache.jackrabbit.oak.segment.Segment; import org.apache.jackrabbit.oak.segment.file.FileStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; class DefaultStandbyReferencesReader implements StandbyReferencesReader { - private static final Logger log = LoggerFactory.getLogger(DefaultStandbyReferencesReader.class); - private final FileStore store; DefaultStandbyReferencesReader(FileStore store) { diff --git oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java index bb4dbac6ed..686a06e563 100644 --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java @@ -25,7 +25,7 @@ import org.apache.jackrabbit.oak.segment.file.FileStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class FileStoreUtil { +public final class FileStoreUtil { private static final Logger log = LoggerFactory.getLogger(FileStoreUtil.class); @@ -33,6 +33,10 @@ final class FileStoreUtil { // Prevent instantiation } + public static int roundDiv(long x, int y) { + return (int) Math.ceil((double) x / (double) y); + } + static Segment readSegmentWithRetry(FileStore store, SegmentId id) { for (int i = 0; i < 160; i++) { if (store.containsSegment(id)) {