diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java index 59dec1b..7fff147 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap; +import java.io.BufferedInputStream; import java.io.Closeable; import java.io.EOFException; import java.io.IOException; @@ -29,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.hive.llap.io.ChunkedInputStream; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.NullWritable; @@ -46,6 +48,7 @@ public class LlapBaseRecordReader implements RecordReader { private static final Logger LOG = LoggerFactory.getLogger(LlapBaseRecordReader.class); + protected final ChunkedInputStream cin; protected final DataInputStream din; protected final Schema schema; protected final Class clazz; @@ -58,7 +61,9 @@ public LlapBaseRecordReader(InputStream in, Schema schema, Class clazz, JobConf job, Closeable client, Closeable socket) { - din = new DataInputStream(in); + this.cin = new ChunkedInputStream(in); // Save so we can verify end of stream + // We need mark support - wrap with BufferedInputStream. + din = new DataInputStream(new BufferedInputStream(cin)); this.schema = schema; this.clazz = clazz; this.readerThread = Thread.currentThread(); @@ -129,19 +134,27 @@ public boolean next(NullWritable key, V value) throws IOException { // Need a way to know what thread to interrupt, since this is a blocking thread. setReaderThread(Thread.currentThread()); - value.readFields(din); - return true; - } catch (EOFException eof) { - // End of input. There should be a reader event available, or coming soon, so okay to be blocking call. - ReaderEvent event = getReaderEvent(); - switch (event.getEventType()) { - case DONE: - break; - default: - throw new IOException("Expected reader event with done status, but got " - + event.getEventType() + " with message " + event.getMessage()); + if (hasInput()) { + value.readFields(din); + return true; + } else { + // End of input. Confirm we got end of stream indicator from server, + // as well as DONE status from fragment execution. + if (!cin.isEndOfData()) { + throw new IOException("Hit end of input, but did not find expected end of data indicator"); + } + + // There should be a reader event available, or coming soon, so okay to be blocking call. + ReaderEvent event = getReaderEvent(); + switch (event.getEventType()) { + case DONE: + break; + default: + throw new IOException("Expected reader event with done status, but got " + + event.getEventType() + " with message " + event.getMessage()); + } + return false; } - return false; } catch (IOException io) { if (Thread.interrupted()) { // Either we were interrupted by one of: @@ -232,6 +245,15 @@ public void handleEvent(ReaderEvent event) { } } + protected boolean hasInput() throws IOException { + din.mark(1); + if (din.read() >= 0) { + din.reset(); + return true; + } + return false; + } + protected ReaderEvent getReaderEvent() throws IOException { try { ReaderEvent event = readerEvents.poll(timeout, TimeUnit.MILLISECONDS); diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java new file mode 100644 index 0000000..4e1c65f --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// Data is expected to be a series of data chunks in the form +// The final data chunk should be a 0-length chunk which will indicate end of input. +public class ChunkedInputStream extends InputStream { + + static final private Logger LOG = LoggerFactory.getLogger(ChunkedInputStream.class); + + private DataInputStream din; + private int unreadBytes = 0; // Bytes remaining in the current chunk of data + private byte[] singleByte = new byte[1]; + private boolean endOfData = false; + + public ChunkedInputStream(InputStream in) { + din = new DataInputStream(in); + } + + @Override + public void close() throws IOException { + din.close(); + } + + @Override + public int read() throws IOException { + int bytesRead = read(singleByte, 0, 1); + return (bytesRead == -1) ? -1 : (int) singleByte[0]; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int bytesRead = 0; + + if (len < 0) { + throw new IllegalArgumentException("Negative read length"); + } else if (len == 0) { + return 0; + } + + // If there is a current unread chunk, read from that, or else get the next chunk. + if (unreadBytes == 0) { + try { + // Find the next chunk size + unreadBytes = din.readInt(); + if (LOG.isDebugEnabled()) { + LOG.debug("Chunk size " + unreadBytes); + } + if (unreadBytes == 0) { + LOG.debug("Hit end of data"); + endOfData = true; + return -1; + } + } catch (IOException err) { + throw new IOException("Error while attempting to read chunk length", err); + } + } + + int bytesToRead = Math.min(len, unreadBytes); + try { + din.readFully(b, off, bytesToRead); + } catch (IOException err) { + throw new IOException("Error while attempting to read " + bytesToRead + " bytes from current chunk", err); + } + unreadBytes -= bytesToRead; + bytesRead += bytesToRead; + + return bytesRead; + } + + public boolean isEndOfData() { + return endOfData; + } +} diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java new file mode 100644 index 0000000..31815d5 --- /dev/null +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// Writes data out as a series of chunks in the form +// Closing the output stream will send a final 0-length chunk which will indicate end of input. +public class ChunkedOutputStream extends OutputStream { + + static final private Logger LOG = LoggerFactory.getLogger(ChunkedOutputStream.class); + + private DataOutputStream dout; + private byte[] singleByte = new byte[1]; + private byte[] buffer; + private int bufPos = 0; + + public ChunkedOutputStream(OutputStream out, int bufSize) { + if (bufSize <= 0) { + throw new IllegalArgumentException("Positive bufSize required, was " + bufSize); + } + buffer = new byte[bufSize]; + dout = new DataOutputStream(out); + } + + @Override + public void write(int b) throws IOException { + singleByte[0] = (byte) b; + write(singleByte, 0, 1); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + int bytesWritten = 0; + while (bytesWritten < len) { + // Copy the data to the buffer + int bytesToWrite = Math.min(len - bytesWritten, buffer.length - bufPos); + System.arraycopy(b, off + bytesWritten, buffer, bufPos, bytesToWrite); + bytesWritten += bytesToWrite; + bufPos += bytesToWrite; + + // If we've filled the buffer, write it out + if (bufPos == buffer.length) { + writeChunk(); + } + } + } + + @Override + public void close() throws IOException { + flush(); + + // Write final 0-length chunk + writeChunk(); + + LOG.debug("ChunkedOutputStream: Closing underlying output stream."); + dout.close(); + } + + @Override + public void flush() throws IOException { + // Write any remaining bytes to the out stream. + if (bufPos > 0) { + writeChunk(); + dout.flush(); + } + } + + private void writeChunk() throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Writing chunk of size " + bufPos); + } + + // First write chunk length + dout.writeInt(bufPos); + + // Then write chunk bytes + dout.write(buffer, 0, bufPos); + + bufPos = 0; // reset buffer + } +} diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java b/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java new file mode 100644 index 0000000..47209c2 --- /dev/null +++ b/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FilterInputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.hive.common.type.RandomTypeUtil; +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestChunkedInputStream { + + static int bufferSize = 128; + static Random rand = new Random(); + static String alphabet = "abcdefghijklmnopqrstuvwxyz"; + + static class StreamTester { + Exception error = null; + + public Exception getError() { + return error; + } + + public void setError(Exception error) { + this.error = error; + } + } + + // Test class to write a series of values to the designated output stream + static class BasicUsageWriter extends StreamTester implements Runnable { + TestStreams streams; + boolean flushCout; + boolean closePoutEarly; + + public BasicUsageWriter(TestStreams streams, boolean flushCout, boolean closePoutEarly) { + this.streams = streams; + this.flushCout = flushCout; + this.closePoutEarly = closePoutEarly; + } + + @Override + public void run() { + try { + // Write the items to the output stream. + for (byte[] value: streams.values) { + streams.out.write(value, 0, value.length); + } + + if (flushCout) { + streams.out.flush(); + } + if (closePoutEarly) { + // Close the inner output stream before closing the outer output stream. + // For chunked output this means we don't write end-of-data indicator. + streams.pout.close(); + } + // This will throw error if we close pout early. + streams.out.close(); + } catch (Exception err) { + err.printStackTrace(); + this.error = err; + } + } + } + + // Test class to read a series of values to the designated input stream + static class BasicUsageReader extends StreamTester implements Runnable { + TestStreams streams; + boolean allValuesRead = false; + + public BasicUsageReader(TestStreams streams) { + this.streams = streams; + } + + // Continue reading from the input stream until the desired number of byte has been read + void readFully(InputStream in, byte[] readValue, int numBytes) throws IOException { + int bytesRead = 0; + while (bytesRead < numBytes) { + int read = in.read(readValue, bytesRead, numBytes - bytesRead); + if (read <= 0) { + throw new IOException("Unexpected read length " + read); + } + bytesRead += read; + } + } + + @Override + public void run() { + try { + // Read the items from the input stream and confirm they match + for (byte[] value : streams.values) { + byte[] readValue = new byte[value.length]; + readFully(streams.in, readValue, readValue.length); + assertArrayEquals(value, readValue); + } + + allValuesRead = true; + + // Check that the output is done + assertEquals(-1, streams.in.read()); + } catch (Exception err) { + err.printStackTrace(); + this.error = err; + } + } + } + + static class MyFilterInputStream extends FilterInputStream { + public MyFilterInputStream(InputStream in) { + super(in); + } + } + + // Helper class to set up a ChunkedInput/Output stream for testing + static class TestStreams { + PipedOutputStream pout; + OutputStream out; + PipedInputStream pin; + InputStream in; + List values; + + public TestStreams(boolean useChunkedStream) throws Exception { + pout = new PipedOutputStream(); + pin = new PipedInputStream(pout); + if (useChunkedStream) { + out = new ChunkedOutputStream(pout, bufferSize); + in = new ChunkedInputStream(pin); + } else { + // Test behavior with non-chunked streams + out = new FilterOutputStream(pout); + in = new MyFilterInputStream(pin); + } + } + + public void close() { + try { + pout.close(); + } catch (Exception err) { + // ignore + } + + try { + pin.close(); + } catch (Exception err) { + // ignore + } + } + } + + static void runTest(Runnable writer, Runnable reader, TestStreams streams) throws Exception { + Thread writerThread = new Thread(writer); + Thread readerThread = new Thread(reader); + + writerThread.start(); + readerThread.start(); + + writerThread.join(); + readerThread.join(); + } + + @Test + public void testBasicUsage() throws Exception { + List values = Arrays.asList( + new byte[]{(byte) 1}, + new byte[]{(byte) 2}, + RandomTypeUtil.getRandString(rand, alphabet, 99).getBytes(), + RandomTypeUtil.getRandString(rand, alphabet, 1024).getBytes() + ); + + // Try the basic test with non-chunked stream + TestStreams nonChunkedStreams = new TestStreams(false); + nonChunkedStreams.values = values; + BasicUsageWriter writer1 = new BasicUsageWriter(nonChunkedStreams, false, false); + BasicUsageReader reader1 = new BasicUsageReader(nonChunkedStreams); + runTest(writer1, reader1, nonChunkedStreams); + assertTrue(reader1.allValuesRead); + assertNull(writer1.getError()); + assertNull(reader1.getError()); + + // Try with chunked streams + TestStreams chunkedStreams = new TestStreams(true); + chunkedStreams.values = values; + BasicUsageWriter writer2 = new BasicUsageWriter(chunkedStreams, false, false); + BasicUsageReader reader2 = new BasicUsageReader(chunkedStreams); + runTest(writer2, reader2, nonChunkedStreams); + assertTrue(reader2.allValuesRead); + assertTrue(((ChunkedInputStream) chunkedStreams.in).isEndOfData()); + assertNull(writer2.getError()); + assertNull(reader2.getError()); + } + + @Test + public void testAbruptlyClosedOutput() throws Exception { + List values = Arrays.asList( + new byte[]{(byte) 1}, + new byte[]{(byte) 2}, + RandomTypeUtil.getRandString(rand, alphabet, 99).getBytes(), + RandomTypeUtil.getRandString(rand, alphabet, 1024).getBytes() + ); + + // Close the PipedOutputStream before we close the outermost OutputStream. + + // Try non-chunked stream. There should be no issues assuming we flushed the streams before closing. + TestStreams nonChunkedStreams = new TestStreams(false); + nonChunkedStreams.values = values; + BasicUsageWriter writer1 = new BasicUsageWriter(nonChunkedStreams, true, true); + BasicUsageReader reader1 = new BasicUsageReader(nonChunkedStreams); + runTest(writer1, reader1, nonChunkedStreams); + assertTrue(reader1.allValuesRead); + assertNull(writer1.getError()); + assertNull(reader1.getError()); + + // Try with chunked stream. Here the chunked output didn't get a chance to write the end-of-data + // indicator, so the chunked input does not know to stop reading. + TestStreams chunkedStreams = new TestStreams(true); + chunkedStreams.values = values; + BasicUsageWriter writer2 = new BasicUsageWriter(chunkedStreams, true, true); + BasicUsageReader reader2 = new BasicUsageReader(chunkedStreams); + runTest(writer2, reader2, chunkedStreams); + assertTrue(reader2.allValuesRead); + assertFalse(((ChunkedInputStream) chunkedStreams.in).isEndOfData()); + // Closing the chunked output stream early gives an error + assertNotNull(writer2.getError()); + // In this case we should expect the test to have failed at the very last read() check. + assertNotNull(reader2.getError()); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 0619b79..c732ba1 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage; +import org.apache.hadoop.hive.llap.io.ChunkedOutputStream; import org.apache.hadoop.hive.llap.security.SecretManager; import com.google.common.base.Preconditions; @@ -198,7 +199,9 @@ private void registerReader(ChannelHandlerContext ctx, String id, byte[] tokenBy HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES); @SuppressWarnings("rawtypes") LlapRecordWriter writer = new LlapRecordWriter( - new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites)); + new ChunkedOutputStream( + new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites), + sendBufferSize)); boolean isFailed = true; synchronized (lock) { if (!writers.containsKey(id)) {