diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index fc0c66a888..05282db163 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -296,6 +296,9 @@ private void repositionInStreams(TreeReaderFactory.TreeReader[] columnReaders, ConsumerStripeMetadata stripeMetadata) throws IOException { PositionProvider[] pps = createPositionProviders( columnReaders, batch.getBatchKey(), stripeMetadata); + if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { + LlapIoImpl.ORC_LOGGER.trace("Created pps {}", Arrays.toString(pps)); + } if (pps == null) return; for (int i = 0; i < columnReaders.length; i++) { TreeReader reader = columnReaders[i]; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 1d7eceb1ef..4e91a856d4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -435,13 +435,12 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, try { if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding) || index == null) { // This stream is for entire stripe and needed for every RG; uncompress once and reuse. - if (isTracingEnabled) { - LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for" - + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length); - } - trace.logStartStripeStream(sctx.kind); if (sctx.stripeLevelStream == null) { - sctx.stripeLevelStream = POOLS.csdPool.take(); + if (isTracingEnabled) { + LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for" + + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length); + } + trace.logStartStripeStream(sctx.kind); sctx.stripeLevelStream = POOLS.csdPool.take(); // We will be using this for each RG while also sending RGs to processing. // To avoid buffers being unlocked, run refcount one ahead; so each RG // processing will decref once, and the last one will unlock the buffers. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java index 42532f9a0e..cabdaa54f6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java @@ -31,6 +31,7 @@ import org.apache.orc.CompressionCodec; import org.apache.orc.TypeDescription; import org.apache.orc.TypeDescription.Category; +import org.apache.orc.impl.InStream; import org.apache.orc.impl.PositionProvider; import org.apache.orc.impl.SettableUncompressedStream; import org.apache.orc.impl.TreeReaderFactory; @@ -213,6 +214,11 @@ public static StreamReaderBuilder builder() { } } + private static void skipCompressedIndex(boolean isCompressed, PositionProvider index) { + if (!isCompressed) return; + index.getNext(); + } + protected static class StringStreamReader extends StringTreeReader implements SettableTreeReader { private boolean _isFileCompressed; @@ -260,30 +266,30 @@ public void seek(PositionProvider index) throws IOException { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); if (_dataStream != null && _dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } ((StringDictionaryTreeReader) reader).getReader().seek(index); - } + } // No need to skip seek here, index won't be used anymore. } else { // DIRECT encoding // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); + // TODO: why does the original code not just use _dataStream that it passes in as stream? + InStream stream = ((StringDirectTreeReader) reader).getStream(); + // TODO: not clear why this check and skipSeek are needed. if (_dataStream != null && _dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } - ((StringDirectTreeReader) reader).getStream().seek(index); + stream.seek(index); + } else { + assert stream == _dataStream; + _dataStream.skipSeek(index); } + skipCompressedIndex(_isFileCompressed, index); if (_lengthStream != null && _lengthStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } ((StringDirectTreeReader) reader).getLengths().seek(index); - } + } // No need to skip seek here, index won't be used anymore. } } @@ -830,10 +836,8 @@ public void seek(PositionProvider index) throws IOException { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } stream.seek(index); } } @@ -945,10 +949,8 @@ public void seek(PositionProvider index) throws IOException { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } stream.seek(index); } } @@ -1071,19 +1073,19 @@ public void seek(PositionProvider index) throws IOException { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); + // TODO: not clear why this check and skipSeek are needed. if (_valueStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } valueStream.seek(index); + } else { + assert valueStream == _valueStream; + _valueStream.skipSeek(index); } + skipCompressedIndex(_isFileCompressed, index); if (_scaleStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } scaleReader.seek(index); - } + } // No need to skip seek here, index won't be used anymore. } @Override @@ -1375,30 +1377,29 @@ public void seek(PositionProvider index) throws IOException { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } ((StringDictionaryTreeReader) reader).getReader().seek(index); - } + } // No need to skip seek here, index won't be used anymore. } else { // DIRECT encoding // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); + InStream stream = ((StringDirectTreeReader) reader).getStream(); + // TODO: not clear why this check and skipSeek are needed. if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } - ((StringDirectTreeReader) reader).getStream().seek(index); + stream.seek(index); + } else { + assert stream == _dataStream; + _dataStream.skipSeek(index); } + skipCompressedIndex(_isFileCompressed, index); if (_lengthStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } ((StringDirectTreeReader) reader).getLengths().seek(index); - } + } // No need to skip seek here, index won't be used anymore. } } @@ -1574,30 +1575,29 @@ public void seek(PositionProvider index) throws IOException { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } ((StringDictionaryTreeReader) reader).getReader().seek(index); - } + } // No need to skip seek here, index won't be used anymore. } else { // DIRECT encoding // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); + InStream stream = ((StringDirectTreeReader) reader).getStream(); + // TODO: not clear why this check and skipSeek are needed. if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } - ((StringDirectTreeReader) reader).getStream().seek(index); + stream.seek(index); + } else { + assert stream == _dataStream; + _dataStream.skipSeek(index); } + skipCompressedIndex(_isFileCompressed, index); if (_lengthStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } ((StringDirectTreeReader) reader).getLengths().seek(index); - } + } // No need to skip seek here, index won't be used anymore. } } @@ -1885,19 +1885,19 @@ public void seek(PositionProvider index) throws IOException { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. + skipCompressedIndex(_isFileCompressed, index); + // TODO: not clear why this check and skipSeek are needed. if (_dataStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } stream.seek(index); + } else { + assert stream == _dataStream; + _dataStream.skipSeek(index); } + skipCompressedIndex(_isFileCompressed, index); if (lengths != null && _lengthsStream.available() > 0) { - if (_isFileCompressed) { - index.getNext(); - } lengths.seek(index); - } + } // No need to skip seek here, index won't be used anymore. } @Override