Index: oak-segment-tar/pom.xml
===================================================================
--- oak-segment-tar/pom.xml (revision 1759750)
+++ oak-segment-tar/pom.xml (working copy)
@@ -35,7 +35,7 @@
1.5.5
- 4.0.23.Final
+ 4.0.41.Final
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java (revision 1759750)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/SegmentLoaderHandler.java (working copy)
@@ -91,15 +91,20 @@
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof SegmentReply) {
- onSegmentReply((SegmentReply) evt);
- }
-
if (evt instanceof String) {
onCommand((String) evt);
}
}
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof SegmentReply) {
+ onSegmentReply((SegmentReply) msg);
+ } else {
+ ctx.fireChannelRead(msg);
+ }
+ }
+
private void onSegmentReply(SegmentReply reply) {
// Offer the reply from the I/O thread, unblocking the sync thread.
segment.offer(reply);
Index: oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java
===================================================================
--- oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java (revision 1759750)
+++ oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ReplyDecoder.java (working copy)
@@ -38,16 +38,18 @@
public class ReplyDecoder extends ReplayingDecoder {
+ private static final int REPLY_HEADER_SIZE = 25;
+
public enum DecodingState {
HEADER, SEGMENT, BLOB
}
- private static final Logger log = LoggerFactory
- .getLogger(ReplyDecoder.class);
+ private static final Logger log = LoggerFactory.getLogger(ReplyDecoder.class);
private final StandbyStore store;
private int length = -1;
+
private byte type = -1;
public ReplyDecoder(StandbyStore store) {
@@ -62,79 +64,96 @@
}
@Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in,
- List