diff --git llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index 0705225..f63375c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -209,6 +209,7 @@ private static final AtomicBoolean started = new AtomicBoolean(false); private static final AtomicBoolean initing = new AtomicBoolean(false); private static ShuffleHandler INSTANCE; + private static final String TIMEOUT_HANDLER = "timeout"; final boolean connectionKeepAliveEnabled; @@ -520,9 +521,16 @@ private void removeJobShuffleInfo(String appIdString) { } private static class TimeoutHandler extends IdleStateAwareChannelHandler { + + private boolean enabledTimeout; + + void setEnabledTimeout(boolean enabledTimeout) { + this.enabledTimeout = enabledTimeout; + } + @Override public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) { - if (e.getState() == IdleState.WRITER_IDLE) { + if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) { e.getChannel().close(); } } @@ -564,7 +572,7 @@ public ChannelPipeline getPipeline() throws Exception { pipeline.addLast("chunking", new ChunkedWriteHandler()); pipeline.addLast("shuffle", SHUFFLE); pipeline.addLast("idle", idleStateHandler); - pipeline.addLast("timeout", new TimeoutHandler()); + pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler()); return pipeline; // TODO factor security manager into pipeline // TODO factor out encode/decode to permit binary shuffle @@ -741,6 +749,13 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) Map mapOutputInfoMap = new HashMap(); Channel ch = evt.getChannel(); + + // In case of KeepAlive, ensure that timeout handler does not close connection until entire + // response is written (i.e, response headers + mapOutput). + ChannelPipeline pipeline = ch.getPipeline(); + TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER); + timeoutHandler.setEnabledTimeout(false); + String user = userRsrc.get(jobId); try { @@ -781,6 +796,9 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) // If Keep alive is enabled, do not close the connection. if (!keepAliveParam && !connectionKeepAliveEnabled) { lastMap.addListener(ChannelFutureListener.CLOSE); + } else { + // Entire response is written out. Safe to enable timeout handling. + timeoutHandler.setEnabledTimeout(true); } }