Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java (revision 1769645) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java (working copy) @@ -77,14 +77,14 @@ private Channel channel; - StandbyClient(String clientId, boolean secure, int readTimeoutMs) { + StandbyClient(NioEventLoopGroup group, String clientId, boolean secure, int readTimeoutMs) { + this.group = group; this.clientId = clientId; this.secure = secure; this.readTimeoutMs = readTimeoutMs; } void connect(String host, int port) throws Exception { - group = new NioEventLoopGroup(); final SslContext sslContext; @@ -149,7 +149,6 @@ @Override public void close() { closeChannel(); - closeGroup(); } private void closeChannel() { @@ -163,17 +162,6 @@ } } - private void closeGroup() { - if (group == null) { - return; - } - if (group.shutdownGracefully(2, 15, TimeUnit.SECONDS).awaitUninterruptibly(20, TimeUnit.SECONDS)) { - log.debug("Group shut down"); - } else { - log.debug("Group shutdown timed out"); - } - } - @Nullable String getHead() throws InterruptedException { channel.writeAndFlush(new GetHeadRequest(clientId)); Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java (revision 1769645) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSync.java (working copy) @@ -23,6 +23,7 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.management.MBeanServer; @@ -31,6 +32,7 @@ import javax.net.ssl.SSLException; import com.google.common.base.Supplier; +import io.netty.channel.nio.NioEventLoopGroup; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.apache.jackrabbit.oak.segment.standby.jmx.ClientStandbyStatusMBean; import org.apache.jackrabbit.oak.segment.standby.jmx.StandbyStatusMBean; @@ -74,6 +76,8 @@ private long syncEndTimestamp; + private final NioEventLoopGroup group; + public StandbyClientSync(String host, int port, FileStore store, boolean secure, int readTimeoutMs, boolean autoClean) throws SSLException { this.state = STATUS_INITIALIZING; this.lastSuccessfulRequest = -1; @@ -88,6 +92,7 @@ this.fileStore = store; String s = System.getProperty(CLIENT_ID_PROPERTY_NAME); this.observer = new CommunicationObserver((s == null || s.length() == 0) ? UUID.randomUUID().toString() : s); + group = new NioEventLoopGroup(); final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); try { @@ -111,6 +116,7 @@ } catch (Exception e) { log.error("can unregister standby status mbean", e); } + closeGroup(); observer.unregister(); state = STATUS_CLOSED; } @@ -134,7 +140,7 @@ try { long startTimestamp = System.currentTimeMillis(); - try (StandbyClient client = new StandbyClient(observer.getID(), secure, readTimeoutMs)) { + try (StandbyClient client = new StandbyClient(group, observer.getID(), secure, readTimeoutMs)) { client.connect(host, port); int genBefore = headGeneration(fileStore); @@ -248,4 +254,17 @@ return syncEndTimestamp; } + + private void closeGroup() { + if (group == null) { + return; + } + if (group.shutdownGracefully(2, 15, TimeUnit.SECONDS).awaitUninterruptibly(20, TimeUnit.SECONDS)) { + log.debug("Group shut down"); + } else { + log.debug("Group shutdown timed out"); + } + } + + } Index: src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java =================================================================== --- src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java (revision 1769645) +++ src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServerSync.java (working copy) @@ -132,7 +132,9 @@ public void close() { stop(); state = STATUS_CLOSING; - + if (server != null) { + server.close(); + } observer.unregister(); final MBeanServer jmxServer = ManagementFactory.getPlatformMBeanServer(); try { Index: src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java (revision 1769645) +++ src/test/java/org/apache/jackrabbit/oak/segment/standby/BrokenNetworkIT.java (working copy) @@ -32,6 +32,8 @@ import org.apache.jackrabbit.oak.segment.test.TemporaryFileStore; import org.apache.jackrabbit.oak.segment.test.proxy.NetworkErrorProxy; import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.RuleChain; @@ -47,12 +49,24 @@ private TemporaryFileStore clientFileStore2 = new TemporaryFileStore(folder, true); + private static NetworkErrorProxy proxy; + @Rule public RuleChain chain = RuleChain.outerRule(folder) .around(serverFileStore) .around(clientFileStore1) .around(clientFileStore2); + @BeforeClass + public static void beforeClass() { + proxy = new NetworkErrorProxy(getProxyPort(), getServerHost(), getServerPort()); + } + + @AfterClass + public static void afterClass() { + proxy.close(); + } + @Test public void testProxy() throws Exception { useProxy(false); @@ -133,13 +147,12 @@ storeS.flush(); // this speeds up the test a little bit... try ( - NetworkErrorProxy p = new NetworkErrorProxy(getProxyPort(), getServerHost(), getServerPort()); StandbyServerSync serverSync = new StandbyServerSync(getServerPort(), storeS, ssl); StandbyClientSync clientSync = newStandbyClientSync(storeC, getProxyPort(), ssl); ) { - p.skipBytes(skipPosition, skipBytes); - p.flipByte(flipPosition); - p.connect(); + proxy.skipBytes(skipPosition, skipBytes); + proxy.flipByte(flipPosition); + proxy.connect(); serverSync.start(); @@ -149,7 +162,7 @@ assertFalse("stores are not expected to be equal", storeS.getHead().equals(storeC.getHead())); assertEquals(storeC2.getHead(), storeC.getHead()); - p.reset(); + proxy.reset(); if (intermediateChange) { addTestContent(store, "server2"); storeS.flush(); Index: src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java =================================================================== --- src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java (revision 1769645) +++ src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java (working copy) @@ -43,12 +43,16 @@ import org.apache.jackrabbit.oak.spi.commit.EmptyHook; import org.apache.jackrabbit.oak.spi.state.NodeBuilder; import org.apache.jackrabbit.oak.spi.state.NodeStore; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; public abstract class DataStoreTestBase extends TestBase { private static final int MB = 1024 * 1024; + private static NetworkErrorProxy proxy; + abstract FileStore getPrimary(); abstract FileStore getSecondary(); @@ -70,6 +74,16 @@ return data; } + @BeforeClass + public static void beforeClass() { + proxy = new NetworkErrorProxy(getProxyPort(), getServerHost(), getServerPort()); + } + + @AfterClass + public static void afterClass() { + proxy.close(); + } + @Test public void testSync() throws Exception { final int blobSize = 5 * MB; @@ -174,13 +188,12 @@ NodeStore store = SegmentNodeStoreBuilders.builder(primary).build(); byte[] data = addTestContent(store, "server", blobSize); try ( - NetworkErrorProxy p = new NetworkErrorProxy(getProxyPort(), getServerHost(), getServerPort()); StandbyServerSync serverSync = new StandbyServerSync(getServerPort(), primary); StandbyClientSync clientSync = newStandbyClientSync(secondary, getProxyPort()) ) { - p.skipBytes(skipPosition, skipBytes); - p.flipByte(flipPosition); - p.connect(); + proxy.skipBytes(skipPosition, skipBytes); + proxy.flipByte(flipPosition); + proxy.connect(); serverSync.start(); primary.flush(); @@ -191,7 +204,7 @@ if (!storesShouldBeEqual()) { assertFalse("stores are not expected to be equal", primary.getHead().equals(secondary.getHead())); } - p.reset(); + proxy.reset(); if (intermediateChange) { blobSize = 2 * MB; data = addTestContent(store, "server", blobSize);