Index: oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java =================================================================== --- oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java (revision 1733236) +++ oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.java (working copy) @@ -23,13 +23,17 @@ import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetSegmentReq; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.util.concurrent.EventExecutorGroup; import org.apache.jackrabbit.oak.api.Blob; import org.apache.jackrabbit.oak.plugins.segment.RecordId; import org.apache.jackrabbit.oak.plugins.segment.Segment; @@ -42,51 +46,93 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SegmentLoaderHandler extends ChannelInboundHandlerAdapter - implements RemoteSegmentLoader { +public class SegmentLoaderHandler extends ChannelInboundHandlerAdapter implements RemoteSegmentLoader { - private static final Logger log = LoggerFactory - .getLogger(SegmentLoaderHandler.class); + private static final Logger log = LoggerFactory.getLogger(SegmentLoaderHandler.class); + private static final AtomicInteger threadId = new AtomicInteger(); + private final StandbyStore store; private final String clientID; private final RecordId head; - private final EventExecutorGroup loaderExecutor; private final AtomicBoolean running; private final int readTimeoutMs; private final boolean autoClean; - private ChannelHandlerContext ctx; + private volatile ChannelHandlerContext ctx; - final BlockingQueue segment = new LinkedBlockingQueue(); + private final BlockingQueue segment = new LinkedBlockingQueue(); - public SegmentLoaderHandler(final StandbyStore store, RecordId head, - EventExecutorGroup loaderExecutor, - String clientID, AtomicBoolean running, int readTimeoutMs, boolean autoClean) { + // Use a separate thread for sync'ing. Leave the I/O thread free to process + // I/O requests. + private final ExecutorService syncExecutor; + + public SegmentLoaderHandler(StandbyStore store, RecordId head, String clientID, AtomicBoolean running, int readTimeoutMs, boolean autoClean) { this.store = store; this.head = head; - this.loaderExecutor = loaderExecutor; this.clientID = clientID; this.running = running; this.readTimeoutMs = readTimeoutMs; this.autoClean = autoClean; + + this.syncExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName("Cold Standby Sync " + threadId.incrementAndGet()); + return t; + } + + }); } @Override - public void channelActive(ChannelHandlerContext ctx) { + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; - initSync(); } @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) - throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("Exception caught, closing channel.", cause); + close(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + syncExecutor.shutdown(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof SegmentReply) { - segment.offer((SegmentReply) evt); + onSegmentReply((SegmentReply) evt); } + + if (evt instanceof String) { + onCommand((String) evt); + } } - private void initSync() { + private void onSegmentReply(SegmentReply reply) { + // Offer the reply from the I/O thread, unblocking the sync thread. + segment.offer(reply); + } + + private void onCommand(String command) { + if (command.equals("sync")) { + syncExecutor.submit(new Runnable() { + + @Override + public void run() { + sync(); + } + + }); + } + } + + private void sync() { log.debug("new head id " + head); long t = System.currentTimeMillis(); long preSyncSize = -1; @@ -142,23 +188,20 @@ @Override public Segment readSegment(final String id) { + // Use the I/O thread to write the request to the server ctx.writeAndFlush(newGetSegmentReq(this.clientID, id)); + // Wait on the sync thread for the response. return getSegment(id); } @Override public Blob readBlob(String blobId) { + // Use the I/O thread to write the request to the server ctx.writeAndFlush(newGetBlobReq(this.clientID, blobId)); + // Wait on the sync thread for the response. return getBlob(blobId); } - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - log.error("Exception caught, closing channel.", cause); - close(); - } - private Segment getSegment(final String id) { return getReply(id, SegmentReply.SEGMENT).getSegment(); } @@ -172,12 +215,14 @@ try { for (;;) { try { - SegmentReply r = segment.poll(readTimeoutMs, - TimeUnit.MILLISECONDS); + // Block the sync thread for a response from the server. + SegmentReply r = segment.poll(readTimeoutMs, TimeUnit.MILLISECONDS); + if (r == null) { log.warn("timeout waiting for {}", id); return SegmentReply.empty(); } + if (r.getType() == type) { switch (r.getType()) { case SegmentReply.SEGMENT: @@ -211,8 +256,7 @@ @Override public boolean isClosed() { - return (loaderExecutor != null && (loaderExecutor.isShuttingDown() || loaderExecutor - .isShutdown())); + return !ctx.channel().isActive(); } @Override Index: oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java =================================================================== --- oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java (revision 1733236) +++ oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClient.java (working copy) @@ -44,8 +44,6 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.CharsetUtil; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; import org.apache.jackrabbit.oak.plugins.segment.SegmentStore; import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder; import org.apache.jackrabbit.oak.plugins.segment.standby.jmx.ClientStandbyStatusMBean; @@ -70,7 +68,6 @@ private final CommunicationObserver observer; private StandbyClientHandler handler; private EventLoopGroup group; - private EventExecutorGroup executor; private SslContext sslContext; private boolean active = false; private int failedRequests; @@ -142,7 +139,6 @@ return; } state = STATUS_STARTING; - executor = new DefaultEventExecutorGroup(4); handler = new StandbyClientHandler(this.store, observer, running, readTimeoutMs, autoClean); group = new NioEventLoopGroup(); @@ -167,7 +163,7 @@ p.addLast(new StringEncoder(CharsetUtil.UTF_8)); p.addLast(new SnappyFramedDecoder(true)); p.addLast(new RecordIdDecoder(store)); - p.addLast(executor, handler); + p.addLast(handler); } }); state = STATUS_RUNNING; @@ -200,10 +196,6 @@ handler.close(); handler = null; } - if (executor != null && !executor.isShuttingDown()) { - executor.shutdownGracefully(0, 1, TimeUnit.SECONDS) - .syncUninterruptibly(); - } if (group != null && !group.isShuttingDown()) { group.shutdownGracefully(0, 1, TimeUnit.SECONDS) .syncUninterruptibly(); Index: oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java =================================================================== --- oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java (revision 1733236) +++ oak-tarmk-standby/src/main/java/org/apache/jackrabbit/oak/plugins/segment/standby/client/StandbyClientHandler.java (working copy) @@ -21,14 +21,11 @@ import static org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages.newGetHeadReq; import java.io.Closeable; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.ReadTimeoutHandler; -import io.netty.util.concurrent.DefaultEventExecutorGroup; -import io.netty.util.concurrent.EventExecutorGroup; import org.apache.jackrabbit.oak.plugins.segment.RecordId; import org.apache.jackrabbit.oak.plugins.segment.standby.codec.RecordIdDecoder; import org.apache.jackrabbit.oak.plugins.segment.standby.codec.ReplyDecoder; @@ -37,8 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StandbyClientHandler extends SimpleChannelInboundHandler - implements Closeable { +public class StandbyClientHandler extends SimpleChannelInboundHandler implements Closeable { private static final Logger log = LoggerFactory .getLogger(StandbyClientHandler.class); @@ -49,11 +45,7 @@ private final int readTimeoutMs; private final boolean autoClean; - private EventExecutorGroup loaderExecutor; - - public StandbyClientHandler(final StandbyStore store, - CommunicationObserver observer, AtomicBoolean running, - int readTimeoutMs, boolean autoClean) { + public StandbyClientHandler(final StandbyStore store, CommunicationObserver observer, AtomicBoolean running, int readTimeoutMs, boolean autoClean) { this.store = store; this.observer = observer; this.running = running; @@ -69,8 +61,7 @@ } @Override - protected void channelRead0(ChannelHandlerContext ctx, RecordId msg) - throws Exception { + protected void channelRead0(ChannelHandlerContext ctx, RecordId msg) throws Exception { setHead(ctx, msg); }; @@ -80,7 +71,6 @@ } synchronized void setHead(ChannelHandlerContext ctx, RecordId head) { - if (store.getHead().getRecordId().equals(head)) { // all sync'ed up log.debug("no changes on sync."); @@ -93,28 +83,19 @@ ctx.pipeline().remove(this); ctx.pipeline().addLast(new ReplyDecoder(store)); - loaderExecutor = new DefaultEventExecutorGroup(4); - SegmentLoaderHandler h2 = new SegmentLoaderHandler(store, head, - loaderExecutor, this.observer.getID(), running, readTimeoutMs, - autoClean); - ctx.pipeline().addLast(loaderExecutor, h2); - - h2.channelActive(ctx); - log.debug("updating current head finished"); + ctx.pipeline().addLast(new SegmentLoaderHandler(store, head, this.observer.getID(), running, readTimeoutMs, autoClean)); + ctx.pipeline().fireUserEventTriggered("sync"); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("Exception caught, closing channel.", cause); close(); } @Override - public synchronized void close() { - if (loaderExecutor != null && !loaderExecutor.isShuttingDown()) { - loaderExecutor.shutdownGracefully(0, 1, TimeUnit.SECONDS) - .syncUninterruptibly(); - } + public void close() { + // This handler doesn't own resources to release } + }