Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java (revision 1725280) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/TCPBroadcaster.java (working copy) @@ -36,8 +36,8 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; @@ -69,7 +69,7 @@ private String ownKeyUUID = UUID.randomUUID().toString(); private byte[] ownKey = ownKeyUUID.getBytes(UTF8); - private volatile boolean stop; + private final AtomicBoolean stop = new AtomicBoolean(false); public TCPBroadcaster(String config) { LOG.info("Init " + config); @@ -201,7 +201,7 @@ } void accept() { - while (!stop) { + while (isRunning()) { try { final Socket socket = serverSocket.accept(); Runnable r = new Runnable() { @@ -238,7 +238,7 @@ } catch (SocketTimeoutException e) { // ignore } catch (IOException e) { - if (!stop) { + if (isRunning()) { LOG.warn("Receive failed", e); } // ignore @@ -253,7 +253,7 @@ } void discover() { - while (!stop) { + while (isRunning()) { DynamicBroadcastConfig b = broadcastConfig; if (b != null) { readClients(b); @@ -260,14 +260,18 @@ } for (Client c : clients.values()) { c.tryConnect(); - if (stop) { + if (!isRunning()) { break; } } - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - // ignore + synchronized (stop) { + if (isRunning()) { + try { + stop.wait(2000); + } catch (InterruptedException e) { + // ignore + } + } } } } @@ -304,10 +308,10 @@ } void send() { - while (!stop) { + while (isRunning()) { try { ByteBuffer buff = sendBuffer.poll(10, TimeUnit.MILLISECONDS); - if (buff != null && !stop) { + if (buff != null && isRunning()) { sendBuffer(buff); } } catch (InterruptedException e) { @@ -339,7 +343,7 @@ buff.get(data); for (Client c : clients.values()) { c.send(data); - if (stop) { + if (!isRunning()) { break; } } @@ -357,9 +361,12 @@ @Override public void close() { - if (!stop) { + if (isRunning()) { LOG.debug("Stopping"); - this.stop = true; + synchronized (stop) { + stop.set(true); + stop.notifyAll(); + } try { serverSocket.close(); } catch (IOException e) { @@ -383,8 +390,8 @@ } } - public boolean isRunning() { - return !stop; + public final boolean isRunning() { + return !stop.get(); } static class Client { Index: oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java =================================================================== --- oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java (revision 1725280) +++ oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/ConcurrentAddNodesClusterIT.java (working copy) @@ -205,7 +205,18 @@ } } + @Ignore @Test + public void loopAddNodes() throws Exception { + for (int i = 0; i < 1000; i++) { + addNodes(); + after(); + before(); + } + } + + + @Test public void addNodes() throws Exception { for (int i = 0; i < 2; i++) { DocumentMK mk = new DocumentMK.Builder()