diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 0876bf7..b849303 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -72,6 +72,7 @@ private final boolean[] included; private final long rowIndexStride; private long rowInStripe = 0; + private long markerPosition = Long.MAX_VALUE; private int currentStripe = -1; private long rowBaseInStripe = 0; private long rowCountInStripe = 0; @@ -2736,7 +2737,37 @@ public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOExcept readStripe(); } - long batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (rowCountInStripe - rowInStripe)); + long batchSize = 0; + + // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row + // groups are selected then marker position is set to the end of range (subset of row groups + // within strip). Batch size computed out of marker position makes sure that batch size is + // aware of row group boundary and will not cause overflow when reading rows + if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) { + int startRowGroup = (int) (rowInStripe / rowIndexStride); + if (!includedRowGroups[startRowGroup]) { + while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) { + startRowGroup += 1; + } + } + + int endRowGroup = startRowGroup; + while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) { + endRowGroup += 1; + } + + markerPosition = (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride) + : rowCountInStripe; + batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (markerPosition - rowInStripe)); + + // if first row group is not selected then advance to the corresponding row group + if (startRowGroup > 0) { + advanceToNextRow(startRowGroup * rowIndexStride); + } + } else { + batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (rowCountInStripe - rowInStripe)); + } + rowInStripe += batchSize; if (previous == null) { ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);