Index: oak-segment-tar/pom.xml =================================================================== --- oak-segment-tar/pom.xml (revision 1759750) +++ oak-segment-tar/pom.xml (working copy) @@ -35,7 +35,7 @@ 1.5.5 - 4.0.23.Final + 4.0.41.Final Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java (revision 1759750) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java (working copy) @@ -91,15 +91,20 @@ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof SegmentReply) { - onSegmentReply((SegmentReply) evt); - } - if (evt instanceof String) { onCommand((String) evt); } } + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof SegmentReply) { + onSegmentReply((SegmentReply) msg); + } else { + ctx.fireChannelRead(msg); + } + } + private void onSegmentReply(SegmentReply reply) { // Offer the reply from the I/O thread, unblocking the sync thread. segment.offer(reply); Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java =================================================================== --- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java (revision 1759750) +++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java (working copy) @@ -38,16 +38,18 @@ public class ReplyDecoder extends ReplayingDecoder { + private static final int REPLY_HEADER_SIZE = 25; + public enum DecodingState { HEADER, SEGMENT, BLOB } - private static final Logger log = LoggerFactory - .getLogger(ReplyDecoder.class); + private static final Logger log = LoggerFactory.getLogger(ReplyDecoder.class); private final StandbyStore store; private int length = -1; + private byte type = -1; public ReplyDecoder(StandbyStore store) { @@ -62,79 +64,96 @@ } @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, - List out) throws Exception { - + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { switch (state()) { - case HEADER: { - length = in.readInt(); - type = in.readByte(); - switch (type) { - case Messages.HEADER_SEGMENT: - checkpoint(DecodingState.SEGMENT); - break; - case Messages.HEADER_BLOB: - checkpoint(DecodingState.BLOB); - break; - default: - throw new Exception("Unknown type: " + type); + case HEADER: { + log.debug("Parsing header"); + length = in.readInt(); + type = in.readByte(); + switch (type) { + case Messages.HEADER_SEGMENT: + checkpoint(DecodingState.SEGMENT); + break; + case Messages.HEADER_BLOB: + checkpoint(DecodingState.BLOB); + break; + default: + throw new Exception("Unknown type: " + type); + } + return; } - return; - } - case SEGMENT: { - Segment s = decodeSegment(in, length, type); - if (s != null) { - out.add(SegmentReply.empty()); - ctx.fireUserEventTriggered(new SegmentReply(s)); - reset(); + case SEGMENT: { + log.debug("Parsing segment"); + Segment s = decodeSegment(in, length, type); + if (s != null) { + out.add(new SegmentReply(s)); + reset(); + } + return; } - return; - } - case BLOB: { - IdArrayBasedBlob b = decodeBlob(in, length, type); - if (b != null) { - out.add(SegmentReply.empty()); - ctx.fireUserEventTriggered(new SegmentReply(b)); - reset(); + case BLOB: { + log.debug("Parsing blob"); + IdArrayBasedBlob b = decodeBlob(in, length, type); + if (b != null) { + out.add(new SegmentReply(b)); + reset(); + } + return; } - return; - } - default: - throw new Exception("Unknown decoding state: " + state()); + default: + log.error("Message state unknown"); + throw new Exception("Unknown decoding state: " + state()); } } private Segment decodeSegment(ByteBuf in, int len, byte type) { + log.debug("Decoding segment, length={}, type={}", len - REPLY_HEADER_SIZE, type); + long msb = in.readLong(); long lsb = in.readLong(); long hash = in.readLong(); + if (log.isDebugEnabled()) { + log.debug("Decoding segment, id={}", new UUID(msb, lsb)); + } + // #readBytes throws a 'REPLAY' exception if there are not enough bytes // available for reading - ByteBuf data = in.readBytes(len - 25); - byte[] segment; - if (data.hasArray()) { - segment = data.array(); - } else { - segment = new byte[len - 25]; - in.readBytes(segment); + byte[] segment = readSegmentBytes(in.readBytes(len - REPLY_HEADER_SIZE)); + + if (log.isDebugEnabled()) { + log.debug("Verifying segment, id={}", new UUID(msb, lsb)); } Hasher hasher = Hashing.murmur3_32().newHasher(); long check = hasher.putBytes(segment).hash().padToLong(); + if (hash == check) { SegmentId id = store.newSegmentId(msb, lsb); - Segment s = store.newSegment(id, ByteBuffer.wrap(segment)); - log.debug("received segment with id {} and size {}", id, s.size()); - return s; + log.debug("Segment verified, id={}", id); + return store.newSegment(id, ByteBuffer.wrap(segment)); } - log.debug("received corrupted segment {}, ignoring", new UUID(msb, lsb)); + + if (log.isDebugEnabled()) { + log.debug("Segment corrupted, id={}", new UUID(msb, lsb)); + } + return null; } + private byte[] readSegmentBytes(ByteBuf data) { + if (data.hasArray()) { + return data.array(); + } + + byte[] result = new byte[data.readableBytes()]; + data.readBytes(result); + return result; + } + private IdArrayBasedBlob decodeBlob(ByteBuf in, int length, byte type) { int inIdLen = in.readInt(); byte[] bid = new byte[inIdLen];