diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java index 239e061..dbe90d6 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java @@ -40,25 +40,45 @@ private ByteBuf buf; private byte[] singleByte = new byte[1]; private boolean closed = false; - private final Object channelWritabilityMonitor; + private final Object writeMonitor = new Object(); + private final int maxPendingWrites; + private volatile int pendingWrites = 0; - private ChannelFutureListener listener = new ChannelFutureListener() { + private ChannelFutureListener writeListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + + pendingWrites--; + + if (future.isCancelled()) { + LOG.error("Write cancelled on ID " + id); + } else if (!future.isSuccess()) { + LOG.error("Write error on ID " + id, future.cause()); + } + + synchronized (writeMonitor) { + writeMonitor.notifyAll(); + } + } + }; + + private ChannelFutureListener closeListener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isCancelled()) { - LOG.error(id + " was cancelled"); + LOG.error("Close cancelled on ID " + id); } else if (!future.isSuccess()) { - LOG.error("Error on ID " + id, future.cause()); + LOG.error("Close failed on ID " + id, future.cause()); } } }; - public ChannelOutputStream(ChannelHandlerContext chc, String id, int bufSize, final Object monitor) { + public ChannelOutputStream(ChannelHandlerContext chc, String id, int bufSize, int maxOutstandingWrites) { this.chc = chc; this.id = id; this.bufSize = bufSize; this.buf = chc.alloc().buffer(bufSize); - this.channelWritabilityMonitor = monitor; + this.maxPendingWrites = maxOutstandingWrites; } @Override @@ -109,10 +129,13 @@ public void close() throws IOException { LOG.error("Error flushing stream before close", err); } + closed = true; + + // Wait for all writes to finish before we actually close. + waitForWritesToFinish(0); + try { - chc.close().addListener(listener).sync(); - } catch (InterruptedException err) { - throw new IOException(err); + chc.close().addListener(closeListener); } finally { buf.release(); buf = null; @@ -121,26 +144,30 @@ public void close() throws IOException { } } + private void waitForWritesToFinish(int desiredWriteCount) throws IOException { + synchronized (writeMonitor) { + // to prevent spurious wake up + while (pendingWrites > desiredWriteCount) { + try { + writeMonitor.wait(); + } catch (InterruptedException ie) { + throw new IOException("Interrupted while waiting for write operations to finish for " + id); + } + } + } + } + private void writeToChannel() throws IOException { if (closed) { throw new IOException("Already closed: " + id); } - chc.writeAndFlush(buf.copy()).addListener(listener); - buf.clear(); + // Wait if we have exceeded our max pending write count + waitForWritesToFinish(maxPendingWrites - 1); - // if underlying channel is not writable (perhaps because of slow consumer) wait for - // notification about writable state change - synchronized (channelWritabilityMonitor) { - // to prevent spurious wake up - while (!chc.channel().isWritable()) { - try { - channelWritabilityMonitor.wait(); - } catch (InterruptedException e) { - throw new IOException("Interrupted when waiting for channel writability state change", e); - } - } - } + pendingWrites++; + chc.writeAndFlush(buf.copy()).addListener(writeListener); + buf.clear(); } private void writeInternal(byte[] b, int off, int len) throws IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 825488f..4f19baa 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -166,7 +166,7 @@ public int getPort() { protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler { private final int sendBufferSize; - private final Object channelWritabilityMonitor = new Object(); + public LlapOutputFormatServiceHandler(final int sendBufferSize) { this.sendBufferSize = sendBufferSize; } @@ -194,9 +194,10 @@ private void registerReader(ChannelHandlerContext ctx, String id, byte[] tokenBy } } LOG.debug("registering socket for: " + id); + int maxPendingWrites = 2; @SuppressWarnings("rawtypes") LlapRecordWriter writer = new LlapRecordWriter( - new ChannelOutputStream(ctx, id, sendBufferSize, channelWritabilityMonitor)); + new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites)); boolean isFailed = true; synchronized (lock) { if (!writers.containsKey(id)) { @@ -222,14 +223,6 @@ private void failChannel(ChannelHandlerContext ctx, String id, String error) { } LOG.error(error); } - - @Override - public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception { - super.channelWritabilityChanged(ctx); - synchronized (channelWritabilityMonitor) { - channelWritabilityMonitor.notifyAll(); - } - } } protected class LlapOutputFormatChannelCloseListener implements ChannelFutureListener {