diff --git llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index dce0c56..085c977 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -74,7 +74,6 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.common.security.JobTokenIdentifier; @@ -88,6 +87,7 @@ import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; @@ -109,9 +109,15 @@ import org.jboss.netty.handler.codec.http.HttpResponseEncoder; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.QueryStringDecoder; +import org.jboss.netty.handler.timeout.IdleState; +import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler; +import org.jboss.netty.handler.timeout.IdleStateEvent; +import org.jboss.netty.handler.timeout.IdleStateHandler; import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.handler.stream.ChunkedWriteHandler; import org.jboss.netty.util.CharsetUtil; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timer; public class ShuffleHandler implements AttemptRegistrationListener { @@ -208,6 +214,7 @@ final boolean connectionKeepAliveEnabled; final int connectionKeepAliveTimeOut; final int mapOutputMetaInfoCacheSize; + Timer timer; private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(SHUFFLE_HANDLER_LOCAL_DIRS); private final Shuffle shuffle; @@ -311,8 +318,10 @@ private ShuffleHandler(Configuration conf) { public void start() throws Exception { ServerBootstrap bootstrap = new ServerBootstrap(selector); + // Timer is shared across entire factory and must be released separately + timer = new HashedWheelTimer(); try { - pipelineFact = new HttpPipelineFactory(conf); + pipelineFact = new HttpPipelineFactory(conf, timer); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -476,6 +485,10 @@ protected void stop() throws Exception { if (pipelineFact != null) { pipelineFact.destroy(); } + if (timer != null) { + // Release this shared timer resource + timer.stop(); + } if (dirWatcher != null) { dirWatcher.stop(); } @@ -506,12 +519,22 @@ private void removeJobShuffleInfo(String appIdString) { userRsrc.remove(appIdString); } + private static class TimeoutHandler extends IdleStateAwareChannelHandler { + @Override + public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) { + if (e.getState() == IdleState.WRITER_IDLE) { + e.getChannel().close(); + } + } + } + class HttpPipelineFactory implements ChannelPipelineFactory { final Shuffle SHUFFLE; private SSLFactory sslFactory; + private final ChannelHandler idleStateHandler; - public HttpPipelineFactory(Configuration conf) throws Exception { + public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception { SHUFFLE = getShuffle(conf); // TODO Setup SSL Shuffle // if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, @@ -520,6 +543,7 @@ public HttpPipelineFactory(Configuration conf) throws Exception { // sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); // sslFactory.init(); // } + this.idleStateHandler = new IdleStateHandler(timer, 0, connectionKeepAliveTimeOut, 0); } public void destroy() { @@ -539,6 +563,8 @@ public ChannelPipeline getPipeline() throws Exception { pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("chunking", new ChunkedWriteHandler()); pipeline.addLast("shuffle", SHUFFLE); + pipeline.addLast("idle", idleStateHandler); + pipeline.addLast("timeout", new TimeoutHandler()); return pipeline; // TODO factor security manager into pipeline // TODO factor out encode/decode to permit binary shuffle @@ -752,7 +778,10 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) return; } } - lastMap.addListener(ChannelFutureListener.CLOSE); + // If Keep alive is enabled, do not close the connection. + if (!keepAliveParam && !connectionKeepAliveEnabled) { + lastMap.addListener(ChannelFutureListener.CLOSE); + } } private String getErrorMessage(Throwable t) {