diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 2745bee..aac1774 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -84,6 +84,7 @@ private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class); private static final boolean isLogTraceEnabled = LOG.isTraceEnabled(); + private final Path path; private final FSDataInputStream file; private final long firstRow; private final List stripes = @@ -265,6 +266,7 @@ static int findColumns(String[] columnNames, long strideRate, Configuration conf ) throws IOException { + this.path = path; this.file = fileSystem.open(path); this.codec = codec; this.types = types; @@ -3263,13 +3265,19 @@ private void advanceToNextRow(long nextRow) throws IOException { @Override public Object next(Object previous) throws IOException { - Object result = reader.next(previous); - // find the next row - rowInStripe += 1; - advanceToNextRow(rowInStripe + rowBaseInStripe); - if (isLogTraceEnabled) { - LOG.trace("row from " + reader.path); - LOG.trace("orc row = " + result); + Object result = null; + try { + reader.next(previous); + // find the next row + rowInStripe += 1; + advanceToNextRow(rowInStripe + rowBaseInStripe); + if (isLogTraceEnabled) { + LOG.trace("row from " + reader.path); + LOG.trace("orc row = " + result); + } + } catch (IOException e) { + // Rethrow exception with file name in log message + throw new IOException("Error reading file: " + path, e); } return result; } @@ -3277,55 +3285,61 @@ public Object next(Object previous) throws IOException { @Override public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException { VectorizedRowBatch result = null; - if (rowInStripe >= rowCountInStripe) { - currentStripe += 1; - readStripe(); - } - - long batchSize = 0; + try { + if (rowInStripe >= rowCountInStripe) { + currentStripe += 1; + readStripe(); + } + + long batchSize = 0; + + // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row + // groups are selected then marker position is set to the end of range (subset of row groups + // within strip). Batch size computed out of marker position makes sure that batch size is + // aware of row group boundary and will not cause overflow when reading rows + // illustration of this case is here https://issues.apache.org/jira/browse/HIVE-6287 + if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) { + int startRowGroup = (int) (rowInStripe / rowIndexStride); + if (!includedRowGroups[startRowGroup]) { + while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) { + startRowGroup += 1; + } + } - // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row - // groups are selected then marker position is set to the end of range (subset of row groups - // within strip). Batch size computed out of marker position makes sure that batch size is - // aware of row group boundary and will not cause overflow when reading rows - // illustration of this case is here https://issues.apache.org/jira/browse/HIVE-6287 - if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) { - int startRowGroup = (int) (rowInStripe / rowIndexStride); - if (!includedRowGroups[startRowGroup]) { - while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) { - startRowGroup += 1; + int endRowGroup = startRowGroup; + while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) { + endRowGroup += 1; } - } - int endRowGroup = startRowGroup; - while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) { - endRowGroup += 1; - } + final long markerPosition = + (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride) + : rowCountInStripe; + batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (markerPosition - rowInStripe)); - final long markerPosition = (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride) - : rowCountInStripe; - batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (markerPosition - rowInStripe)); + if (LOG.isDebugEnabled() && batchSize < VectorizedRowBatch.DEFAULT_SIZE) { + LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize); + } + } else { + batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (rowCountInStripe - rowInStripe)); + } - if (LOG.isDebugEnabled() && batchSize < VectorizedRowBatch.DEFAULT_SIZE) { - LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize); + rowInStripe += batchSize; + if (previous == null) { + ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize); + result = new VectorizedRowBatch(cols.length); + result.cols = cols; + } else { + result = previous; + result.selectedInUse = false; + reader.nextVector(result.cols, (int) batchSize); } - } else { - batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (rowCountInStripe - rowInStripe)); - } - rowInStripe += batchSize; - if (previous == null) { - ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize); - result = new VectorizedRowBatch(cols.length); - result.cols = cols; - } else { - result = (VectorizedRowBatch) previous; - result.selectedInUse = false; - reader.nextVector(result.cols, (int) batchSize); + result.size = (int) batchSize; + advanceToNextRow(rowInStripe + rowBaseInStripe); + } catch (IOException e) { + // Rethrow exception with file name in log message + throw new IOException("Error reading file: " + path, e); } - - result.size = (int) batchSize; - advanceToNextRow(rowInStripe + rowBaseInStripe); return result; }