Index: core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java =================================================================== --- core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (revision 1692352) +++ core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java (working copy) @@ -25,6 +25,7 @@ import java.net.BindException; import java.net.InetSocketAddress; import java.util.Map; +import java.util.concurrent.ExecutionException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -100,12 +101,19 @@ server.start(); LOG.info("BSPPeer address:" + server.getAddress().getHostName() + " port:" + server.getAddress().getPort()); - } catch (BindException e) { - LOG.warn("Address already in use. Retrying " + hostName + ":" + port + 1); - if (retry++ >= MAX_RETRY) { - throw new RuntimeException("RPC Server could not be launched!"); + } catch (InterruptedException e) { + e.printStackTrace(); + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + e.printStackTrace(); + if (e.getCause() instanceof BindException) { + final int nextPort = port + 1; + LOG.warn("Address already in use. Retrying " + hostName + ":" + nextPort); + if (retry++ >= MAX_RETRY) { + throw new RuntimeException("RPC Server could not be launched!"); + } + startServer(hostName, nextPort); } - startServer(hostName, port + 1); } } Index: core/src/main/java/org/apache/hama/ipc/AsyncServer.java =================================================================== --- core/src/main/java/org/apache/hama/ipc/AsyncServer.java (revision 1692352) +++ core/src/main/java/org/apache/hama/ipc/AsyncServer.java (working copy) @@ -19,34 +19,13 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.util.ReferenceCountUtil; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - +import io.netty.util.concurrent.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -60,6 +39,17 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import java.io.*; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.Future; + /** * An abstract IPC service using netty. IPC calls take a single {@link Writable} * as a parameter, and return a {@link Writable}* @@ -171,54 +161,59 @@ } /** start server listener */ - public void start() { - new NioServerListener().start(); + public void start() throws ExecutionException, InterruptedException { + ExecutorService es = Executors.newSingleThreadExecutor(); + Future future = es.submit(new NioServerListener()); + try { + ChannelFuture closeFuture = future.get(); + closeFuture.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(io.netty.util.concurrent.Future voidFuture) throws Exception { + // Stop the server gracefully if it's not terminated. + stop(); + } + }); + } finally { + es.shutdown(); + } } - private class NioServerListener extends Thread { + private class NioServerListener implements Callable { /** * Configure and start nio server */ @Override - public void run() { + public ChannelFuture call() throws Exception { SERVER.set(AsyncServer.this); - try { - // ServerBootstrap is a helper class that sets up a server - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .option(ChannelOption.SO_BACKLOG, backlogLength) - .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT) - .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) - .childOption(ChannelOption.SO_KEEPALIVE, true) - .childOption(ChannelOption.SO_RCVBUF, 30 * 1024 * 1024) - .childOption(ChannelOption.RCVBUF_ALLOCATOR, - new FixedRecvByteBufAllocator(100 * 1024)) + // ServerBootstrap is a helper class that sets up a server + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, backlogLength) + .childOption(ChannelOption.MAX_MESSAGES_PER_READ, NIO_BUFFER_LIMIT) + .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_RCVBUF, 30 * 1024 * 1024) + .childOption(ChannelOption.RCVBUF_ALLOCATOR, + new FixedRecvByteBufAllocator(100 * 1024)) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline p = ch.pipeline(); - // Register accumulation processing handler - p.addLast(new NioFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 0)); - // Register message processing handler - p.addLast(new NioServerInboundHandler()); - } - }); + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + // Register accumulation processing handler + p.addLast(new NioFrameDecoder(100 * 1024 * 1024, 0, 4, 0, 0)); + // Register message processing handler + p.addLast(new NioServerInboundHandler()); + } + }); - // Bind and start to accept incoming connections. - ChannelFuture f = b.bind(port).sync(); - LOG.info("AsyncServer startup"); - // Wait until the server socket is closed. - f.channel().closeFuture().sync(); - } catch (Exception e) { - e.printStackTrace(); - } finally { - // Shut down Server gracefully - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - } + // Bind and start to accept incoming connections. + ChannelFuture f = b.bind(port).sync(); + LOG.info("AsyncServer startup"); + + return f.channel().closeFuture(); } }