Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java (revision 1727391) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/PersistentCache.java (revision ) @@ -27,11 +27,8 @@ import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore; import org.apache.jackrabbit.oak.plugins.document.DocumentStore; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.InMemoryBroadcaster; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.UDPBroadcaster; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.*; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.netty.tcp.TcpMulticastBroadcaster; import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; import org.h2.mvstore.FileStore; import org.h2.mvstore.MVMap; @@ -205,6 +202,9 @@ } else if (broadcast.startsWith("tcp:")) { String config = broadcast.substring("tcp:".length(), broadcast.length()); broadcaster = new TCPBroadcaster(config); + } else if (broadcast.startsWith("nettyTcp:")) { + String config = broadcast.substring("nettyTcp:".length(), broadcast.length()); + broadcaster = new TcpMulticastBroadcaster(config); } else { throw new IllegalArgumentException("Unknown broadcaster type " + broadcast); } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastServer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastServer.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastServer.java (revision ) @@ -0,0 +1,85 @@ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.netty.tcp; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster; + +import javax.net.ssl.SSLException; +import java.security.cert.CertificateException; +import java.util.List; + +/** + * Created by suter on 25/01/16. + */ +public final class TcpMulticastServer { + + private final EventLoopGroup bossGroup = new NioEventLoopGroup(1); + //TODO make amount configurable + private final EventLoopGroup workerGroup = new NioEventLoopGroup(16); + private ServerBootstrap bootstrap; + private int port = 0; + private Channel channel; + + public TcpMulticastServer(final List listeners, final boolean hasSSL) { + SslContext sslCtx = null; + if (hasSSL) { + try { + //FIXME needs proper integration with certificate management + SelfSignedCertificate ssc = new SelfSignedCertificate(); + sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); + } catch (CertificateException e) { + //TODO + } catch (SSLException e) { + //TODO + } + } + final TcpMulticastServerInitializer serverInitializer = new TcpMulticastServerInitializer(sslCtx, listeners); + bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_REUSEADDR, true) + //TODO make amount configurable + .option(ChannelOption.SO_TIMEOUT, 100) + //.option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 1024) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(serverInitializer) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + } + + public void bind(final int port) throws InterruptedException { + this.port = port; + this.channel = this.bootstrap.bind(port).sync().channel(); + } + + public void syncStart() throws InterruptedException { + this.channel.closeFuture().sync(); + } + + public void start(final int port) throws InterruptedException { + this.port = port; + this.channel = this.bootstrap.bind(port).sync().channel(); + } + + public void stop() { + channel.close(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + + public int getPort() { + return port; + } + +} + + Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastClient.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastClient.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastClient.java (revision ) @@ -0,0 +1,144 @@ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.netty.tcp; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOption; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import javax.net.ssl.SSLException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * Created by suter on 25/01/16. + */ +public final class TcpMulticastClient { + + //TODO make configurable + private static final int INIT_WAIT_TIME_MS = 100; + private Bootstrap bootstrap; + private NioEventLoopGroup nioEventLoopGroup; + private ChannelGroup recipients; + + //TODO make amount configurable + private static final int MAX_BUFFER_SIZE = 128; + private final ArrayBlockingQueue sendBuffer = new ArrayBlockingQueue(MAX_BUFFER_SIZE); + private final Map servers = new ConcurrentHashMap(); + private static final ByteBuffer SHUTDOWN_MSG = ByteBuffer.wrap("SHUTDOWN NOW".getBytes()); + + public TcpMulticastClient(final boolean hasSSL) { + //TODO make amount configurable + nioEventLoopGroup = new NioEventLoopGroup(16); + bootstrap = new Bootstrap(); + + SslContext sslCtx = null; + try { + if (hasSSL) { + sslCtx = SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); + } + } catch (SSLException e) { + //TODO + } + final ChannelInboundHandlerAdapter channelInitializer = new TcpMulticastClientInitializer(sslCtx); + recipients = new DefaultChannelGroup(nioEventLoopGroup.next()); + bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + //TODO make amount configurable + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024) + .option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024) + //.option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 1024) + .handler(channelInitializer); + nioEventLoopGroup.submit(new SendThread()); + } + + public void close() { + sendBuffer.add(SHUTDOWN_MSG); + recipients.close().awaitUninterruptibly(); + nioEventLoopGroup.shutdownGracefully(); + } + + public void add(String hostPort, byte[] id) { + servers.put(hostPort, id); + } + + public void tryConnect(){ + nioEventLoopGroup.submit(new Runnable() { + @Override + public void run() { + //give the server some time before we try to connect + try { + TimeUnit.MILLISECONDS.sleep(INIT_WAIT_TIME_MS); + } catch (InterruptedException e) { + //do nothing + } + for (Map.Entry server : servers.entrySet()) { + connect(server.getKey(), server.getValue()); + } + } + }); + } + + public void connect(String hostPort, byte[] id) { + int index = hostPort.lastIndexOf(':'); + if (index >= 0) { + String host = hostPort.substring(0, index); + int port = Integer.parseInt(hostPort.substring(index + 1)); + try { + Channel ch = bootstrap.connect(host, port).sync().channel(); + ch.writeAndFlush(id); + recipients.add(ch); + } catch (Exception e) { + //do nothing in case there is no server + } + } + } + + public void sendDirect(final byte[] data) { + recipients.writeAndFlush(data); + } + + public void sendBuffer(final ByteBuffer byteBuffer) { + sendBuffer.offer(byteBuffer); + } + + + private class SendThread implements Runnable { + @Override + public void run() { + ByteBuffer buffer; + try { + int flusherCount = 0; + while ((buffer = sendBuffer.take()) != SHUTDOWN_MSG) { + if (buffer != null) { + recipients.write(buffer.array()); + } + //Only flush every 16th message to carry for network latency + //TODO make amount configurable + if (flusherCount % 16 == 0){ + recipients.flush(); + flusherCount = 0; + } else { + flusherCount++; + } + } + } catch (InterruptedException e) { + //do nothing + } + } + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastClientInitializer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastClientInitializer.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastClientInitializer.java (revision ) @@ -0,0 +1,57 @@ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.netty.tcp; + +import io.netty.channel.*; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldPrepender; +import io.netty.handler.codec.bytes.ByteArrayEncoder; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import javax.net.ssl.SSLException; + +/** + * Created by suter on 27/01/16. + */ +@ChannelHandler.Sharable +public class TcpMulticastClientInitializer extends ChannelInboundHandlerAdapter { + + private SslContext sslCtx; + + public TcpMulticastClientInitializer(SslContext sslCtx){ + this.sslCtx = sslCtx; + } + + public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { + ChannelPipeline pipeline = ctx.channel().pipeline(); + pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); + pipeline.addLast("bytesEncoder", new ByteArrayEncoder()); + ctx.fireChannelRegistered(); + } + + public void channelActive(ChannelHandlerContext ctx) throws Exception { + SocketChannel ch = (SocketChannel)ctx.channel(); + + //need to add it here otherwise the host address and port are not yet available + if (sslCtx != null) { + ch.pipeline().addFirst("sslEncoder", sslCtx.newHandler(ch.alloc(), + ch.remoteAddress().getAddress().getHostAddress(), ch.remoteAddress().getPort())); + } + ctx.pipeline().remove(this); + ctx.fireChannelActive(); + } + + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + //logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause); + + try { + ChannelPipeline pipeline = ctx.pipeline(); + if(pipeline.context(this) != null) { + pipeline.remove(this); + } + } finally { + ctx.close(); + } + + } +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastBroadcaster.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastBroadcaster.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastBroadcaster.java (revision ) @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.netty.tcp; + +import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.security.MessageDigest; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * A broadcast mechanism that uses TCP. It is mainly used for testing. + */ +public final class TcpMulticastBroadcaster implements Broadcaster { + + static final Logger LOG = LoggerFactory.getLogger(TcpMulticastBroadcaster.class); + private static final Charset UTF8 = Charset.forName("UTF-8"); + + private final List listeners = new CopyOnWriteArrayList(); + private TcpMulticastClient tcpClient; + private TcpMulticastServer tcpServer; + private String ownListener; + private String ownKeyUUID = UUID.randomUUID().toString(); + private byte[] ownKey = ownKeyUUID.getBytes(UTF8); + + public TcpMulticastBroadcaster(String config) { + LOG.info("Init " + config); + init(config); + } + + public void init(String config) { + String[] parts = config.split(";"); + int startPort = 9800; + int endPort = 9810; + String key = ""; + + // for debugging, this will send everything to localhost: + // String[] sendTo = {"sendTo", "localhost"}; + + // by default, only the entries in the clusterNodes + // collection are used: + String[] sendTo = {"sendTo"}; + + //FIXME would have to be secure by default + boolean hasSSL = false; + + for (String p : parts) { + if (p.startsWith("ports ")) { + String[] ports = p.split(" "); + startPort = Integer.parseInt(ports[1]); + endPort = Integer.parseInt(ports[2]); + } else if (p.startsWith("key ")) { + key = p.split(" ")[1]; + } else if (p.startsWith("sendTo ")) { + sendTo = p.split(" "); + } else if (p.startsWith("ssl ")) { + hasSSL = Boolean.parseBoolean(p.split(" ")[1]); + } + } + + tcpClient = new TcpMulticastClient(hasSSL); + tcpServer = new TcpMulticastServer(listeners, hasSSL); + + try { + sendTo[0] = null; + MessageDigest messageDigest = MessageDigest.getInstance("SHA-256"); + if (key.length() > 0) { + ownKey = messageDigest.digest(key.getBytes(UTF8)); + } + Exception lastException = null; + for (int port = startPort; port <= endPort; port++) { + if (port == startPort || lastException != null) { + try { + tcpServer.start(port); + lastException = null; + } catch (Exception e) { + LOG.debug("Cannot open port " + port); + lastException = e; + // ignore + } + } + for (String send : sendTo) { + if (send != null && !send.isEmpty()) { + if (!send.isEmpty()) { + tcpClient.add(send+":"+port, ownKey); + } + } + } + } + if (lastException != null) { + throw lastException; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + tcpClient.tryConnect(); + } + + @Override + public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) { + HashMap clientInfo = new HashMap(); + clientInfo.put(DynamicBroadcastConfig.ID, ownKeyUUID); + int port = tcpServer.getPort(); + if (port > 0) { + String address = getLocalAddress(); + if (address != null) { + ownListener = address + ":" + port; + clientInfo.put(DynamicBroadcastConfig.LISTENER, ownListener); + } + } + broadcastConfig.connect(clientInfo); + readClients(broadcastConfig); + } + + private static String getLocalAddress() { + String bind = System.getProperty("oak.tcpBindAddress", null); + try { + InetAddress address; + if (bind != null && !bind.isEmpty()) { + address = InetAddress.getByName(bind); + } else { + address = InetAddress.getLocalHost(); + } + return address.getHostAddress(); + } catch (UnknownHostException e) { + return ""; + } + } + + private void readClients(DynamicBroadcastConfig b) { + List> list = b.getClientInfo(); + for (Map m : list) { + String listener = m.get(DynamicBroadcastConfig.LISTENER); + String id = m.get(DynamicBroadcastConfig.ID); + if (listener.equals(ownListener)) { + continue; + } + tcpClient.connect(listener, id.getBytes(UTF8)); + } + } + + @Override + public void send(ByteBuffer buff) { + ByteBuffer b = ByteBuffer.allocate(buff.remaining()); + b.put(buff); + b.flip(); + tcpClient.sendBuffer(b); + } + + @Override + public void addListener(Listener listener) { + listeners.add(listener); + } + + @Override + public void removeListener(Listener listener) { + listeners.remove(listener); + } + + @Override + public void close() { + tcpClient.close(); + tcpServer.stop(); + } + +} Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastServerInitializer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastServerInitializer.java (revision ) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/broadcast/netty/tcp/TcpMulticastServerInitializer.java (revision ) @@ -0,0 +1,64 @@ +package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.netty.tcp; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.ssl.SslContext; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster; + +import java.nio.ByteBuffer; +import java.util.List; + +/** + * Creates a newly configured {@link ChannelPipeline} for a new channel. + * Created by suter on 25/01/16. + */ +@ChannelHandler.Sharable +public class TcpMulticastServerInitializer extends ChannelInitializer { + + private final List listeners; + private final SslContext sslCtx; + + public TcpMulticastServerInitializer(SslContext sslCtx, List listeners) { + this.sslCtx = sslCtx; + this.listeners = listeners; + } + + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + if (sslCtx != null) { + pipeline.addLast(sslCtx.newHandler(ch.alloc())); + } + + pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + pipeline.addLast("bytesDecoder", new CacheByteDecoder(listeners)); + } + + private static class CacheByteDecoder extends MessageToMessageDecoder { + + private List listeners; + + protected CacheByteDecoder(List listeners) { + super(); + this.listeners = listeners; + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { + byte[] array = new byte[msg.readableBytes()]; + msg.getBytes(0, array); + ByteBuffer buff = ByteBuffer.wrap(array); + int start = buff.position(); + for (Broadcaster.Listener l : listeners) { + buff.position(start); + l.receive(buff); + } + } + } +} Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java (revision 1727412) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/persistentCache/BroadcastTest.java (revision ) @@ -22,11 +22,17 @@ import java.io.File; import java.io.IOException; +import java.net.InetAddress; +import java.net.InterfaceAddress; +import java.net.NetworkInterface; +import java.net.SocketException; import java.nio.ByteBuffer; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Enumeration; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.jackrabbit.oak.cache.CacheLIRS; @@ -34,10 +40,9 @@ import org.apache.jackrabbit.oak.plugins.document.Revision; import org.apache.jackrabbit.oak.plugins.document.RevisionVector; import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster; -import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.TCPBroadcaster; +import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.netty.tcp.TcpMulticastBroadcaster; import org.apache.jackrabbit.oak.plugins.document.util.StringValue; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.LoggerFactory; @@ -50,41 +55,47 @@ import com.google.common.cache.Cache; public class BroadcastTest { - + public static void main(String... args) throws Exception { - listen(); + //listen(); benchmark(); } - + private static void benchmark() throws IOException { FileUtils.deleteDirectory(new File("target/broadcastTest")); - new File("target/broadcastTest").mkdirs(); + new File("target/broadcastTest").mkdirs(); - String type = "tcp:key 1;ports 9700 9800"; + String[] types = { + "tcp:key 1;ports 9700 9800", + "nettyTcp:key 1;ports 9700 9800" + }; ArrayList nodeList = new ArrayList(); - for (int nodes = 1; nodes < 20; nodes++) { + for (String type : types){ + System.out.println("TYPE: "+type); + for (int nodes = 1; nodes < 30; nodes++) { - PersistentCache pc = new PersistentCache("target/broadcastTest/p" + nodes + ",broadcast=" + type); - Cache cache = openCache(pc); - String key = "/test" + Math.random(); - PathRev k = new PathRev(key, new RevisionVector(new Revision(0, 0, 0))); - long time = System.currentTimeMillis(); + PersistentCache pc = new PersistentCache("target/broadcastTest/p" + nodes + ",broadcast=" + type); + Cache cache = openCache(pc); + String key = "/test" + Math.random(); + PathRev k = new PathRev(key, new RevisionVector(new Revision(0, 0, 0))); + long time = System.currentTimeMillis(); - for (int i = 0; i < 2000; i++) { + for (int i = 0; i < 200000; i++) { - cache.put(k, new StringValue("Hello World " + i)); - cache.invalidate(k); - cache.getIfPresent(k); - } - time = System.currentTimeMillis() - time; - System.out.println("nodes: " + nodes + " time: " + time); - nodeList.add(pc); - } - for (PersistentCache c : nodeList) { - c.close(); - } - } + cache.put(k, new StringValue("Hello World " + i)); + cache.invalidate(k); + cache.getIfPresent(k); + } + time = System.currentTimeMillis() - time; + System.out.println("nodes: " + nodes + " time: " + time); + nodeList.add(pc); + } + for (PersistentCache c : nodeList) { + c.close(); + } + } + } - + private static void listen() throws InterruptedException { String config = "key 123"; - + ConsoleAppender ca = new ConsoleAppender(); LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); ca.setContext(lc); @@ -94,12 +105,12 @@ pl.start(); ca.setLayout(pl); ca.start(); - + - ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(TCPBroadcaster.class); + ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(TcpMulticastBroadcaster.class); logger.addAppender(ca); logger.setLevel(Level.DEBUG); - + - TCPBroadcaster receiver = new TCPBroadcaster(config); + TcpMulticastBroadcaster receiver = new TcpMulticastBroadcaster(config); receiver.addListener(new Broadcaster.Listener() { @Override @@ -120,7 +131,7 @@ System.out.println(dateTime + " Received " + sb); buff.position(end); } - + }); Random r = new Random(); int x = r.nextInt(); @@ -133,16 +144,21 @@ buff.put(new byte[100]); buff.flip(); receiver.send(buff); - if (!receiver.isRunning()) { - System.out.println("Did not start or already stopped"); - break; - } + } - } Thread.sleep(Integer.MAX_VALUE); } - + - @Ignore("OAK-3887") @Test + public void broadcastTcpNettySsl() throws Exception { + broadcast("nettyTcp:sendTo localhost;key 123;ssl true", 90); + } + + @Test + public void broadcastTcpNetty() throws Exception { + broadcast("nettyTcp:sendTo localhost;key 123", 90); + } + + @Test public void broadcastTCP() throws Exception { broadcast("tcp:sendTo localhost;key 123", 90); } @@ -151,36 +167,58 @@ public void broadcastInMemory() throws Exception { broadcast("inMemory", 100); } - + @Test - @Ignore("OAK-2843") public void broadcastUDP() throws Exception { try { - broadcast("udp:sendTo localhost", 50); + String localBroadcastIp = getLocalBroadcastAddress(); + System.out.println("BROADCASTING TO: " + localBroadcastIp); + broadcast("udp:sendTo "+localBroadcastIp, 50); } catch (AssertionError e) { // IPv6 didn't work, so try with IPv4 - try { - broadcast("udp:group 228.6.7.9", 50); + broadcast("udp:group 228.6.7.9", 50); + try { } catch (AssertionError e2) { throwBoth(e, e2); - } + } } } - + @Test - @Ignore("OAK-2843") public void broadcastEncryptedUDP() throws Exception { try { - broadcast("udp:group FF78:230::1234;key test;port 9876;sendTo localhost;aes", 50); + String localBroadcastIp = getLocalBroadcastAddress(); + System.out.println("BROADCASTING TO: " + localBroadcastIp); + broadcast("udp:group FF78:230::1234;key test;port 9876;sendTo "+localBroadcastIp+";aes", 50); } catch (AssertionError e) { try { - broadcast("udp:group 228.6.7.9;key test;port 9876;aes", 50); + broadcast("udp:group 228.6.7.9;key test;port 9876;aes", 50); } catch (AssertionError e2) { throwBoth(e, e2); - } + } } } - + + private static String getLocalBroadcastAddress(){ + try { + Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); + while (interfaces.hasMoreElements()) { + NetworkInterface networkInterface = interfaces.nextElement(); + if (networkInterface.isLoopback()) + continue; + for (InterfaceAddress interfaceAddress : + networkInterface.getInterfaceAddresses()) { + InetAddress broadcast = interfaceAddress.getBroadcast(); + if (broadcast != null) + return broadcast.getHostAddress(); + } + } + } catch (SocketException e) { + e.printStackTrace(); + } + return null; + } + private static void throwBoth(AssertionError e, AssertionError e2) throws AssertionError { Throwable ex = e; while (ex.getCause() != null) { @@ -192,14 +230,18 @@ private static void broadcast(String type, int minPercentCorrect) throws Exception { FileUtils.deleteDirectory(new File("target/broadcastTest")); - new File("target/broadcastTest").mkdirs(); + new File("target/broadcastTest").mkdirs(); - PersistentCache p1 = new PersistentCache("target/broadcastTest/p1,broadcast=" + type); + PersistentCache p1 = new PersistentCache("target/broadcastTest/p1,broadcast="+type); - PersistentCache p2 = new PersistentCache("target/broadcastTest/p2,broadcast=" + type); + PersistentCache p2 = new PersistentCache("target/broadcastTest/p2,broadcast="+type); + //Wait for broadcasting client + server to get started asynchronously + //otherwise the first two messages will be dropped + TimeUnit.MILLISECONDS.sleep(200); Cache c1 = openCache(p1); Cache c2 = openCache(p2); String key = "/test" + Math.random(); PathRev k = new PathRev(key, new RevisionVector(new Revision(0, 0, 0))); int correct = 0; + for (int i = 0; i < 50; i++) { c1.put(k, new StringValue("Hello World " + i)); waitFor(c2, k, 100); @@ -217,10 +259,11 @@ } p1.close(); p2.close(); + System.out.println("min: " + minPercentCorrect + " got: " + correct); - Assert.assertTrue("min: " + minPercentCorrect + " got: " + correct, + Assert.assertTrue("min: " + minPercentCorrect + " got: " + correct, correct >= minPercentCorrect); } - + private static boolean waitFor(Callable call, int timeout) { long start = System.currentTimeMillis(); while (true) { @@ -242,7 +285,7 @@ } } } - + private static boolean waitFor(final Cache map, final K key, final V value, int timeout) { return waitFor(new Callable() { @Override @@ -250,12 +293,12 @@ V v = map.getIfPresent(key); if (value == null) { return v == null; - } + } return value.equals(v); } }, timeout); } - + private static boolean waitFor(final Cache map, final K key, int timeout) { return waitFor(new Callable() { @Override @@ -264,11 +307,11 @@ } }, timeout); } - + private static Cache openCache(PersistentCache p) { CacheLIRS cache = new CacheLIRS.Builder(). maximumSize(1).build(); - return p.wrap(null, null, cache, CacheType.DIFF); + return p.wrap(null, null, cache, CacheType.DIFF); } } Index: oak-core/pom.xml IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- oak-core/pom.xml (revision 1727391) +++ oak-core/pom.xml (revision ) @@ -31,6 +31,10 @@ Oak Core bundle + + 4.0.33.Final + + @@ -361,6 +365,30 @@ json-simple 1.1.1 test + + + io.netty + netty-handler + ${netty-version} + provided + + + io.netty + netty-buffer + ${netty-version} + provided + + + io.netty + netty-codec + ${netty-version} + provided + + + io.netty + netty-transport + ${netty-version} + provided