diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java index cb38839..438bff1 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.llap; +import com.google.common.base.Preconditions; + import java.io.BufferedInputStream; import java.io.Closeable; import java.io.EOFException; @@ -55,21 +57,18 @@ protected Thread readerThread = null; protected final LinkedBlockingQueue readerEvents = new LinkedBlockingQueue(); - protected final long timeout; protected final Closeable client; private final Closeable socket; private boolean closed = false; public LlapBaseRecordReader(InputStream in, Schema schema, Class clazz, JobConf job, Closeable client, Closeable socket) { - this.cin = new ChunkedInputStream(in); // Save so we can verify end of stream + this.cin = new ChunkedInputStream(in, client.toString()); // Save so we can verify end of stream // We need mark support - wrap with BufferedInputStream. din = new DataInputStream(new BufferedInputStream(cin)); this.schema = schema; this.clazz = clazz; this.readerThread = Thread.currentThread(); - this.timeout = 3 * HiveConf.getTimeVar(job, - HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); this.client = client; this.socket = socket; } @@ -273,10 +272,8 @@ protected boolean hasInput() throws IOException { protected ReaderEvent getReaderEvent() throws IOException { try { - ReaderEvent event = readerEvents.poll(timeout, TimeUnit.MILLISECONDS); - if (event == null) { - throw new IOException("Timed out getting readerEvents"); - } + ReaderEvent event = readerEvents.take(); + Preconditions.checkNotNull(event); return event; } catch (InterruptedException ie) { throw new RuntimeException("Interrupted while getting readerEvents, not expected: " + ie.getMessage(), ie); diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index 4304b52..ee7d0d3 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -320,6 +320,18 @@ void setLastHeartbeat(long lastHeartbeat) { this.requestInfo.lastHeartbeat.set(lastHeartbeat); } + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("LlapTaskUmbilicalExternalClient"); + if (requestInfo != null) { + sb.append("("); + sb.append(requestInfo.taskAttemptId); + sb.append(")"); + } + return sb.toString(); + } + // Periodic task to time out submitted tasks that have not been updated with umbilical heartbeat. private static class HeartbeatCheckTask implements Runnable { LlapTaskUmbilicalExternalImpl umbilicalImpl; diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java index 4e1c65f..1c97403 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java @@ -35,13 +35,17 @@ private int unreadBytes = 0; // Bytes remaining in the current chunk of data private byte[] singleByte = new byte[1]; private boolean endOfData = false; + private String id; - public ChunkedInputStream(InputStream in) { + public ChunkedInputStream(InputStream in, String id) { din = new DataInputStream(in); + this.id = id; + LOG.debug("Creating chunked input for {}", id); } @Override public void close() throws IOException { + LOG.debug("{}: Closing chunked input.", id); din.close(); } @@ -56,7 +60,7 @@ public int read(byte[] b, int off, int len) throws IOException { int bytesRead = 0; if (len < 0) { - throw new IllegalArgumentException("Negative read length"); + throw new IllegalArgumentException(id + ": Negative read length"); } else if (len == 0) { return 0; } @@ -67,15 +71,15 @@ public int read(byte[] b, int off, int len) throws IOException { // Find the next chunk size unreadBytes = din.readInt(); if (LOG.isDebugEnabled()) { - LOG.debug("Chunk size " + unreadBytes); + LOG.debug("{}: Chunk size {}", id, unreadBytes); } if (unreadBytes == 0) { - LOG.debug("Hit end of data"); + LOG.debug("{}: Hit end of data", id); endOfData = true; return -1; } } catch (IOException err) { - throw new IOException("Error while attempting to read chunk length", err); + throw new IOException(id + ": Error while attempting to read chunk length", err); } } @@ -83,7 +87,7 @@ public int read(byte[] b, int off, int len) throws IOException { try { din.readFully(b, off, bytesToRead); } catch (IOException err) { - throw new IOException("Error while attempting to read " + bytesToRead + " bytes from current chunk", err); + throw new IOException(id + ": Error while attempting to read " + bytesToRead + " bytes from current chunk", err); } unreadBytes -= bytesToRead; bytesRead += bytesToRead; diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java index 31815d5..3a82915 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java @@ -35,13 +35,16 @@ private byte[] singleByte = new byte[1]; private byte[] buffer; private int bufPos = 0; + private String id; - public ChunkedOutputStream(OutputStream out, int bufSize) { + public ChunkedOutputStream(OutputStream out, int bufSize, String id) { + LOG.debug("Creating chunked input stream: {}", id); if (bufSize <= 0) { throw new IllegalArgumentException("Positive bufSize required, was " + bufSize); } buffer = new byte[bufSize]; dout = new DataOutputStream(out); + this.id = id; } @Override @@ -74,7 +77,7 @@ public void close() throws IOException { // Write final 0-length chunk writeChunk(); - LOG.debug("ChunkedOutputStream: Closing underlying output stream."); + LOG.debug("{}: Closing underlying output stream.", id); dout.close(); } @@ -89,7 +92,7 @@ public void flush() throws IOException { private void writeChunk() throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("Writing chunk of size " + bufPos); + LOG.debug("{}: Writing chunk of size {}", id, bufPos); } // First write chunk length diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java b/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java index 47209c2..ebbf3a4 100644 --- a/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java +++ b/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java @@ -150,8 +150,8 @@ public TestStreams(boolean useChunkedStream) throws Exception { pout = new PipedOutputStream(); pin = new PipedInputStream(pout); if (useChunkedStream) { - out = new ChunkedOutputStream(pout, bufferSize); - in = new ChunkedInputStream(pin); + out = new ChunkedOutputStream(pout, bufferSize, "test"); + in = new ChunkedInputStream(pin, "test"); } else { // Test behavior with non-chunked streams out = new FilterOutputStream(pout); @@ -209,7 +209,7 @@ public void testBasicUsage() throws Exception { chunkedStreams.values = values; BasicUsageWriter writer2 = new BasicUsageWriter(chunkedStreams, false, false); BasicUsageReader reader2 = new BasicUsageReader(chunkedStreams); - runTest(writer2, reader2, nonChunkedStreams); + runTest(writer2, reader2, chunkedStreams); assertTrue(reader2.allValuesRead); assertTrue(((ChunkedInputStream) chunkedStreams.in).isEndOfData()); assertNull(writer2.getError()); diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java index dbe90d6..fe09f58 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.Semaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,25 +41,19 @@ private ByteBuf buf; private byte[] singleByte = new byte[1]; private boolean closed = false; - private final Object writeMonitor = new Object(); private final int maxPendingWrites; - private volatile int pendingWrites = 0; + private final Semaphore writeResources; private ChannelFutureListener writeListener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { - - pendingWrites--; + writeResources.release(); if (future.isCancelled()) { LOG.error("Write cancelled on ID " + id); } else if (!future.isSuccess()) { LOG.error("Write error on ID " + id, future.cause()); } - - synchronized (writeMonitor) { - writeMonitor.notifyAll(); - } } }; @@ -79,6 +74,7 @@ public ChannelOutputStream(ChannelHandlerContext chc, String id, int bufSize, in this.bufSize = bufSize; this.buf = chc.alloc().buffer(bufSize); this.maxPendingWrites = maxOutstandingWrites; + this.writeResources = new Semaphore(maxPendingWrites); } @Override @@ -126,13 +122,13 @@ public void close() throws IOException { try { flush(); } catch (IOException err) { - LOG.error("Error flushing stream before close", err); + LOG.error("Error flushing stream before close on " + id, err); } closed = true; // Wait for all writes to finish before we actually close. - waitForWritesToFinish(0); + takeWriteResources(maxPendingWrites); try { chc.close().addListener(closeListener); @@ -144,16 +140,12 @@ public void close() throws IOException { } } - private void waitForWritesToFinish(int desiredWriteCount) throws IOException { - synchronized (writeMonitor) { - // to prevent spurious wake up - while (pendingWrites > desiredWriteCount) { - try { - writeMonitor.wait(); - } catch (InterruptedException ie) { - throw new IOException("Interrupted while waiting for write operations to finish for " + id); - } - } + // Attempt to acquire write resources, waiting if they are not available. + private void takeWriteResources(int numResources) throws IOException { + try { + writeResources.acquire(numResources); + } catch (InterruptedException ie) { + throw new IOException("Interrupted while waiting for write resources for " + id); } } @@ -162,10 +154,7 @@ private void writeToChannel() throws IOException { throw new IOException("Already closed: " + id); } - // Wait if we have exceeded our max pending write count - waitForWritesToFinish(maxPendingWrites - 1); - - pendingWrites++; + takeWriteResources(1); chc.writeAndFlush(buf.copy()).addListener(writeListener); buf.clear(); } diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index c732ba1..586006b 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -198,10 +198,10 @@ private void registerReader(ChannelHandlerContext ctx, String id, byte[] tokenBy int maxPendingWrites = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES); @SuppressWarnings("rawtypes") - LlapRecordWriter writer = new LlapRecordWriter( + LlapRecordWriter writer = new LlapRecordWriter(id, new ChunkedOutputStream( new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites), - sendBufferSize)); + sendBufferSize, id)); boolean isFailed = true; synchronized (lock) { if (!writers.containsKey(id)) { diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java index b632fae..02588cb 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java @@ -33,15 +33,17 @@ implements RecordWriter { public static final Logger LOG = LoggerFactory.getLogger(LlapRecordWriter.class); + String id; DataOutputStream dos; - public LlapRecordWriter(OutputStream out) { + public LlapRecordWriter(String id, OutputStream out) { + this.id = id; dos = new DataOutputStream(out); } @Override public void close(Reporter reporter) throws IOException { - LOG.info("CLOSING the record writer output stream"); + LOG.info("CLOSING the record writer output stream for " + id); dos.close(); }