diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java index e861791..239e061 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java @@ -40,6 +40,7 @@ private ByteBuf buf; private byte[] singleByte = new byte[1]; private boolean closed = false; + private final Object channelWritabilityMonitor; private ChannelFutureListener listener = new ChannelFutureListener() { @Override @@ -52,11 +53,12 @@ public void operationComplete(ChannelFuture future) { } }; - public ChannelOutputStream(ChannelHandlerContext chc, String id, int bufSize) { + public ChannelOutputStream(ChannelHandlerContext chc, String id, int bufSize, final Object monitor) { this.chc = chc; this.id = id; this.bufSize = bufSize; this.buf = chc.alloc().buffer(bufSize); + this.channelWritabilityMonitor = monitor; } @Override @@ -124,8 +126,21 @@ private void writeToChannel() throws IOException { throw new IOException("Already closed: " + id); } - chc.write(buf.copy()).addListener(listener); + chc.writeAndFlush(buf.copy()).addListener(listener); buf.clear(); + + // if underlying channel is not writable (perhaps because of slow consumer) wait for + // notification about writable state change + synchronized (channelWritabilityMonitor) { + // to prevent spurious wake up + while (!chc.channel().isWritable()) { + try { + channelWritabilityMonitor.wait(); + } catch (InterruptedException e) { + throw new IOException("Interrupted when waiting for channel writability state change", e); + } + } + } } private void writeInternal(byte[] b, int off, int len) throws IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 151a31f..825488f 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -166,6 +166,7 @@ public int getPort() { protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler { private final int sendBufferSize; + private final Object channelWritabilityMonitor = new Object(); public LlapOutputFormatServiceHandler(final int sendBufferSize) { this.sendBufferSize = sendBufferSize; } @@ -195,7 +196,7 @@ private void registerReader(ChannelHandlerContext ctx, String id, byte[] tokenBy LOG.debug("registering socket for: " + id); @SuppressWarnings("rawtypes") LlapRecordWriter writer = new LlapRecordWriter( - new ChannelOutputStream(ctx, id, sendBufferSize)); + new ChannelOutputStream(ctx, id, sendBufferSize, channelWritabilityMonitor)); boolean isFailed = true; synchronized (lock) { if (!writers.containsKey(id)) { @@ -221,6 +222,14 @@ private void failChannel(ChannelHandlerContext ctx, String id, String error) { } LOG.error(error); } + + @Override + public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception { + super.channelWritabilityChanged(ctx); + synchronized (channelWritabilityMonitor) { + channelWritabilityMonitor.notifyAll(); + } + } } protected class LlapOutputFormatChannelCloseListener implements ChannelFutureListener {