diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java index 46ef866..181cb60 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.llap.io.api.impl; import java.io.IOException; -import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -42,10 +41,8 @@ import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer; import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; public class LlapIoImpl implements LlapIo { public static final Log LOG = LogFactory.getLog(LlapIoImpl.class); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java index cec4653..e07485e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java @@ -19,18 +19,16 @@ package org.apache.hadoop.hive.llap.io.decode; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.hadoop.hive.llap.Consumer; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; -import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer; import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader; +import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.mapred.InputSplit; @@ -68,150 +66,6 @@ public void onFailure(Throwable t) { } } - private class EncodedDataConsumer implements ConsumerFeedback, - Consumer> { - private volatile boolean isStopped = false; - // TODO: use array, precreate array based on metadata first? Works for ORC. For now keep dumb. - private final HashMap> pendingData = - new HashMap>(); - private ConsumerFeedback upstreamFeedback; - private final Consumer downstreamConsumer; - private final int colCount; - - public EncodedDataConsumer(Consumer consumer, int colCount) { - this.downstreamConsumer = consumer; - this.colCount = colCount; - } - - public void init(ConsumerFeedback upstreamFeedback) { - this.upstreamFeedback = upstreamFeedback; - } - - @Override - public void consumeData(EncodedColumnBatch data) { - EncodedColumnBatch targetBatch = null; - boolean localIsStopped = false; - synchronized (pendingData) { - localIsStopped = isStopped; - if (!localIsStopped) { - targetBatch = pendingData.get(data.batchKey); - if (targetBatch == null) { - targetBatch = data; - pendingData.put(data.batchKey, data); - } - } - } - if (localIsStopped) { - returnProcessed(data.columnData); - return; - } - - synchronized (targetBatch) { - // Check if we are stopped and the batch was already cleaned. - localIsStopped = (targetBatch.columnData == null); - if (!localIsStopped) { - if (targetBatch != data) { - targetBatch.merge(data); - } - if (0 == targetBatch.colsRemaining) { - synchronized (pendingData) { - targetBatch = isStopped ? null : pendingData.remove(data.batchKey); - } - // Check if we are stopped and the batch had been removed from map. - localIsStopped = (targetBatch == null); - // We took the batch out of the map. No more contention with stop possible. - } - } - } - if (localIsStopped) { - returnProcessed(data.columnData); - return; - } - if (0 == targetBatch.colsRemaining) { - ColumnVectorProducer.this.decodeBatch(targetBatch, downstreamConsumer); - // Batch has been decoded; unlock the buffers in cache - returnProcessed(targetBatch.columnData); - } - } - - private void returnProcessed(StreamBuffer[][] data) { - for (StreamBuffer[] sbs : data) { - for (StreamBuffer sb : sbs) { - if (sb.decRef() != 0) continue; - upstreamFeedback.returnData(sb); - } - } - } - - @Override - public void setDone() { - synchronized (pendingData) { - if (!pendingData.isEmpty()) { - throw new AssertionError("Not all data has been sent downstream: " + pendingData.size()); - } - } - downstreamConsumer.setDone(); - } - - - @Override - public void setError(Throwable t) { - downstreamConsumer.setError(t); - dicardPendingData(false); - } - - @Override - public void returnData(ColumnVectorBatch data) { - // TODO: column vectors could be added to object pool here - } - - private void dicardPendingData(boolean isStopped) { - List> batches = new ArrayList>( - pendingData.size()); - synchronized (pendingData) { - if (isStopped) { - this.isStopped = true; - } - batches.addAll(pendingData.values()); - pendingData.clear(); - } - List dataToDiscard = new ArrayList(batches.size() * colCount * 2); - for (EncodedColumnBatch batch : batches) { - synchronized (batch) { - for (StreamBuffer[] bb : batch.columnData) { - for (StreamBuffer b : bb) { - dataToDiscard.add(b); - } - } - batch.columnData = null; - } - } - for (StreamBuffer data : dataToDiscard) { - if (data.decRef() == 0) { - upstreamFeedback.returnData(data); - } - } - } - - @Override - public void stop() { - upstreamFeedback.stop(); - dicardPendingData(true); - } - - @Override - public void pause() { - // We are just a relay; send pause to encoded data producer. - upstreamFeedback.pause(); - } - - @Override - public void unpause() { - // We are just a relay; send unpause to encoded data producer. - upstreamFeedback.unpause(); - } - } - /** * Reads ColumnVectorBatch-es. * @param consumer Consumer that will receive the batches asynchronously. @@ -222,10 +76,15 @@ public void unpause() { public ConsumerFeedback read(InputSplit split, List columnIds, SearchArgument sarg, String[] columnNames, Consumer consumer) throws IOException { - // Create the consumer of encoded data; it will coordinate decoding to CVBs. - final EncodedDataConsumer edc = new EncodedDataConsumer(consumer, columnIds.size()); // Get the source of encoded data. EncodedDataProducer edp = getEncodedDataProducer(); + // Create the consumer of encoded data; it will coordinate decoding to CVBs. + final EncodedDataConsumer edc; + if (edp instanceof OrcEncodedDataProducer) { + edc = new OrcEncodedDataConsumer(this, consumer, columnIds.size()); + } else { + edc = new EncodedDataConsumer(this, consumer, columnIds.size()); + } // Then, get the specific reader of encoded data out of the producer. EncodedDataReader reader = edp.createReader( split, columnIds, sarg, columnNames, edc); @@ -241,6 +100,7 @@ public void unpause() { protected abstract EncodedDataProducer getEncodedDataProducer(); - protected abstract void decodeBatch(EncodedColumnBatch batch, + protected abstract void decodeBatch(EncodedDataConsumer batchKeyEncodedDataConsumer, + EncodedColumnBatch batch, Consumer downstreamConsumer); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java new file mode 100644 index 0000000..adcf6b0 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.io.decode; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.hadoop.hive.llap.Consumer; +import org.apache.hadoop.hive.llap.ConsumerFeedback; +import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; + +/** + * + */ +public class EncodedDataConsumer implements ConsumerFeedback, + Consumer> { + private volatile boolean isStopped = false; + // TODO: use array, precreate array based on metadata first? Works for ORC. For now keep dumb. + private final HashMap> pendingData = new HashMap<>(); + private ConsumerFeedback upstreamFeedback; + private final Consumer downstreamConsumer; + private final int colCount; + private ColumnVectorProducer cvp; + + public EncodedDataConsumer(ColumnVectorProducer cvp, + Consumer consumer, int colCount) { + this.downstreamConsumer = consumer; + this.colCount = colCount; + this.cvp = cvp; + } + + public void init(ConsumerFeedback upstreamFeedback) { + this.upstreamFeedback = upstreamFeedback; + } + + @Override + public void consumeData(EncodedColumnBatch data) { + EncodedColumnBatch targetBatch = null; + boolean localIsStopped = false; + synchronized (pendingData) { + localIsStopped = isStopped; + if (!localIsStopped) { + targetBatch = pendingData.get(data.batchKey); + if (targetBatch == null) { + targetBatch = data; + pendingData.put(data.batchKey, data); + } + } + } + if (localIsStopped) { + returnProcessed(data.columnData); + return; + } + + synchronized (targetBatch) { + // Check if we are stopped and the batch was already cleaned. + localIsStopped = (targetBatch.columnData == null); + if (!localIsStopped) { + if (targetBatch != data) { + targetBatch.merge(data); + } + if (0 == targetBatch.colsRemaining) { + synchronized (pendingData) { + targetBatch = isStopped ? null : pendingData.remove(data.batchKey); + } + // Check if we are stopped and the batch had been removed from map. + localIsStopped = (targetBatch == null); + // We took the batch out of the map. No more contention with stop possible. + } + } + } + if (localIsStopped) { + returnProcessed(data.columnData); + return; + } + if (0 == targetBatch.colsRemaining) { + cvp.decodeBatch(this, targetBatch, downstreamConsumer); + // Batch has been decoded; unlock the buffers in cache + returnProcessed(targetBatch.columnData); + } + } + + protected void returnProcessed(EncodedColumnBatch.StreamBuffer[][] data) { + for (EncodedColumnBatch.StreamBuffer[] sbs : data) { + for (EncodedColumnBatch.StreamBuffer sb : sbs) { + if (sb.decRef() != 0) continue; + upstreamFeedback.returnData(sb); + } + } + } + + @Override + public void setDone() { + synchronized (pendingData) { + if (!pendingData.isEmpty()) { + throw new AssertionError("Not all data has been sent downstream: " + pendingData.size()); + } + } + downstreamConsumer.setDone(); + } + + + @Override + public void setError(Throwable t) { + downstreamConsumer.setError(t); + dicardPendingData(false); + } + + @Override + public void returnData(ColumnVectorBatch data) { + // TODO: column vectors could be added to object pool here + } + + private void dicardPendingData(boolean isStopped) { + List> batches = new ArrayList>( + pendingData.size()); + synchronized (pendingData) { + if (isStopped) { + this.isStopped = true; + } + batches.addAll(pendingData.values()); + pendingData.clear(); + } + List dataToDiscard = new ArrayList(batches.size() * colCount * 2); + for (EncodedColumnBatch batch : batches) { + synchronized (batch) { + for (EncodedColumnBatch.StreamBuffer[] bb : batch.columnData) { + for (EncodedColumnBatch.StreamBuffer b : bb) { + dataToDiscard.add(b); + } + } + batch.columnData = null; + } + } + for (EncodedColumnBatch.StreamBuffer data : dataToDiscard) { + if (data.decRef() == 0) { + upstreamFeedback.returnData(data); + } + } + } + + @Override + public void stop() { + upstreamFeedback.stop(); + dicardPendingData(true); + } + + @Override + public void pause() { + // We are just a relay; send pause to encoded data producer. + upstreamFeedback.pause(); + } + + @Override + public void unpause() { + // We are just a relay; send unpause to encoded data producer. + upstreamFeedback.unpause(); + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 824d9c4..5e07aec 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BinaryStreamReader; import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BooleanStreamReader; import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.ByteStreamReader; @@ -53,46 +54,55 @@ import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; public class OrcColumnVectorProducer extends ColumnVectorProducer { - private final OrcEncodedDataProducer edp; - private final OrcMetadataCache metadataCache; - private boolean skipCorrupt; + private final OrcEncodedDataProducer _edp; + private final OrcMetadataCache _metadataCache; + private boolean _skipCorrupt; + private int _previousStripeIndex; - public OrcColumnVectorProducer( - ExecutorService executor, OrcEncodedDataProducer edp, Configuration conf) { + public OrcColumnVectorProducer(ExecutorService executor, OrcEncodedDataProducer edp, + Configuration conf) { super(executor); if (LlapIoImpl.LOGL.isInfoEnabled()) { LlapIoImpl.LOG.info("Initializing ORC column vector producer"); } - this.edp = edp; - this.metadataCache = OrcMetadataCache.getInstance(); - this.skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA); + this._edp = edp; + this._metadataCache = OrcMetadataCache.getInstance(); + this._skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA); + this._previousStripeIndex = -1; } @Override protected EncodedDataProducer getEncodedDataProducer() { - return edp; + return _edp; } @Override - protected void decodeBatch(EncodedColumnBatch batch, + protected void decodeBatch(EncodedDataConsumer edc, + EncodedColumnBatch batch, Consumer downstreamConsumer) { + OrcEncodedDataConsumer oedc = (OrcEncodedDataConsumer) edc; String fileName = batch.batchKey.file; + int currentStripeIndex = batch.batchKey.stripeIx; + if (_previousStripeIndex == -1) { + _previousStripeIndex = currentStripeIndex; + } + boolean sameStripe = currentStripeIndex == _previousStripeIndex; // OrcEncodedDataProducer should have just loaded cache entries from this file. // The default LRU algorithm shouldn't have dropped the entries. To make it // safe, untie the code from EDP into separate class and make use of loading cache. The current // assumption is that entries for the current file exists in metadata cache. try { - OrcFileMetadata fileMetadata = metadataCache.getFileMetadata(fileName); + OrcFileMetadata fileMetadata = _metadataCache.getFileMetadata(fileName); OrcBatchKey stripeKey = batch.batchKey.clone(); // To get stripe metadata we only need to know the stripe number. Oddly, stripe metadata - // accepts BatchKey as key. We need to keep to row group index in batch key the same to + // accepts BatchKey as key. We need to keep the row group index in batch key the same to // retrieve the stripe metadata properly. To make sure we get the correct stripe // metadata, set row group index to 0. That's how it is cached. See OrcEncodedDataProducer stripeKey.rgIx = 0; - OrcStripeMetadata stripeMetadata = metadataCache.getStripeMetadata(stripeKey); + OrcStripeMetadata stripeMetadata = _metadataCache.getStripeMetadata(stripeKey); // Get non null row count from root column, to get max vector batches int rgIdx = batch.batchKey.rgIx; @@ -101,8 +111,15 @@ protected void decodeBatch(EncodedColumnBatch batch, int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1); int batchSize = VectorizedRowBatch.DEFAULT_SIZE; int numCols = batch.columnIxs.length; - RecordReaderImpl.TreeReader[] columnReaders = createTreeReaders(numCols, batch, fileMetadata, - stripeMetadata); + if (oedc.getColumnReaders() == null || !sameStripe) { + RecordReaderImpl.TreeReader[] columnReaders = createTreeReaders(numCols, batch, + fileMetadata, stripeMetadata); + oedc.setColumnReaders(columnReaders); + } else { + repositionInStreams(oedc.getColumnReaders(), batch, sameStripe, numCols, fileMetadata, + stripeMetadata); + } + _previousStripeIndex = currentStripeIndex; for (int i = 0; i < maxBatchesRG; i++) { ColumnVectorBatch cvb = new ColumnVectorBatch(batch.columnIxs.length); @@ -114,7 +131,7 @@ protected void decodeBatch(EncodedColumnBatch batch, } for (int idx = 0; idx < batch.columnIxs.length; idx++) { - cvb.cols[idx] = (ColumnVector) columnReaders[idx].nextVector(null, batchSize); + cvb.cols[idx] = (ColumnVector) oedc.getColumnReaders()[idx].nextVector(null, batchSize); } // we are done reading a batch, send it to consumer for processing @@ -144,7 +161,6 @@ protected void decodeBatch(EncodedColumnBatch batch, // then 1st position (compressed offset) in row index should be skipped to get // uncompressed offset, else 1st position should not be skipped. CompressionCodec codec = fileMetadata.getCompressionCodec(); - int bufferSize = fileMetadata.getCompressionBufferSize(); OrcProto.ColumnEncoding columnEncoding = stripeMetadata.getEncodings().get(columnIndex); OrcProto.RowIndex rowIndex = stripeMetadata.getRowIndexes()[columnIndex]; OrcProto.RowIndexEntry rowIndexEntry = rowIndex.getEntry(rowGroupIndex); @@ -191,8 +207,6 @@ protected void decodeBatch(EncodedColumnBatch batch, .setDataStream(data) .setLengthStream(lengths) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .setColumnEncoding(columnEncoding) .build(); break; @@ -203,8 +217,6 @@ protected void decodeBatch(EncodedColumnBatch batch, .setPresentStream(present) .setDataStream(data) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .build(); break; case BYTE: @@ -214,8 +226,6 @@ protected void decodeBatch(EncodedColumnBatch batch, .setPresentStream(present) .setDataStream(data) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .build(); break; case SHORT: @@ -225,8 +235,6 @@ protected void decodeBatch(EncodedColumnBatch batch, .setPresentStream(present) .setDataStream(data) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .setColumnEncoding(columnEncoding) .build(); break; @@ -237,8 +245,6 @@ protected void decodeBatch(EncodedColumnBatch batch, .setPresentStream(present) .setDataStream(data) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .setColumnEncoding(columnEncoding) .build(); break; @@ -249,10 +255,8 @@ protected void decodeBatch(EncodedColumnBatch batch, .setPresentStream(present) .setDataStream(data) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .setColumnEncoding(columnEncoding) - .skipCorrupt(skipCorrupt) + .skipCorrupt(_skipCorrupt) .build(); break; case FLOAT: @@ -262,8 +266,6 @@ protected void decodeBatch(EncodedColumnBatch batch, .setPresentStream(present) .setDataStream(data) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .build(); break; case DOUBLE: @@ -273,8 +275,6 @@ protected void decodeBatch(EncodedColumnBatch batch, .setPresentStream(present) .setDataStream(data) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .build(); break; case CHAR: @@ -289,8 +289,6 @@ protected void decodeBatch(EncodedColumnBatch batch, .setLengthStream(lengths) .setDictionaryStream(dictionary) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .setColumnEncoding(columnEncoding) .build(); break; @@ -303,8 +301,6 @@ protected void decodeBatch(EncodedColumnBatch batch, .setLengthStream(lengths) .setDictionaryStream(dictionary) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .setColumnEncoding(columnEncoding) .build(); break; @@ -318,8 +314,6 @@ protected void decodeBatch(EncodedColumnBatch batch, .setValueStream(data) .setScaleStream(secondary) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .setColumnEncoding(columnEncoding) .build(); break; @@ -331,10 +325,8 @@ protected void decodeBatch(EncodedColumnBatch batch, .setSecondsStream(data) .setNanosStream(secondary) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .setColumnEncoding(columnEncoding) - .skipCorrupt(skipCorrupt) + .skipCorrupt(_skipCorrupt) .build(); break; case DATE: @@ -344,19 +336,114 @@ protected void decodeBatch(EncodedColumnBatch batch, .setPresentStream(present) .setDataStream(data) .setCompressionCodec(codec) - .setBufferSize(bufferSize) - .setRowIndex(rowIndexEntry) .setColumnEncoding(columnEncoding) .build(); break; default: throw new UnsupportedOperationException("Data type not supported yet! " + columnType); } + treeReaders[i].seek(StreamUtils.getPositionProvider(rowIndexEntry)); } return treeReaders; } + private void repositionInStreams(RecordReaderImpl.TreeReader[] columnReaders, + EncodedColumnBatch batch, boolean sameStripe, int numCols, + OrcFileMetadata fileMetadata, OrcStripeMetadata stripeMetadata) throws IOException { + for (int i = 0; i < numCols; i++) { + int columnIndex = batch.columnIxs[i]; + int rowGroupIndex = batch.batchKey.rgIx; + EncodedColumnBatch.StreamBuffer[] streamBuffers = batch.columnData[i]; + OrcProto.Type columnType = fileMetadata.getTypes().get(columnIndex); + OrcProto.RowIndex rowIndex = stripeMetadata.getRowIndexes()[columnIndex]; + OrcProto.RowIndexEntry rowIndexEntry = rowIndex.getEntry(rowGroupIndex); + + // stream buffers are arranged in enum order of stream kind + EncodedColumnBatch.StreamBuffer present = null; + EncodedColumnBatch.StreamBuffer data = null; + EncodedColumnBatch.StreamBuffer dictionary = null; + EncodedColumnBatch.StreamBuffer lengths = null; + EncodedColumnBatch.StreamBuffer secondary = null; + for (EncodedColumnBatch.StreamBuffer streamBuffer : streamBuffers) { + switch(streamBuffer.streamKind) { + case 0: + // PRESENT stream + present = streamBuffer; + break; + case 1: + // DATA stream + data = streamBuffer; + break; + case 2: + // LENGTH stream + lengths = streamBuffer; + break; + case 3: + // DICTIONARY_DATA stream + dictionary = streamBuffer; + break; + case 5: + // SECONDARY stream + secondary = streamBuffer; + break; + default: + throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind); + } + } + + switch (columnType.getKind()) { + case BINARY: + ((BinaryStreamReader)columnReaders[i]).setBuffers(present, data, lengths); + break; + case BOOLEAN: + ((BooleanStreamReader)columnReaders[i]).setBuffers(present, data); + break; + case BYTE: + ((ByteStreamReader)columnReaders[i]).setBuffers(present, data); + break; + case SHORT: + ((ShortStreamReader)columnReaders[i]).setBuffers(present, data); + break; + case INT: + ((IntStreamReader)columnReaders[i]).setBuffers(present, data); + break; + case LONG: + ((LongStreamReader)columnReaders[i]).setBuffers(present, data); + break; + case FLOAT: + ((FloatStreamReader)columnReaders[i]).setBuffers(present, data); + break; + case DOUBLE: + ((DoubleStreamReader)columnReaders[i]).setBuffers(present, data); + break; + case CHAR: + case VARCHAR: + ((CharacterStreamReader)columnReaders[i]).setBuffers(present, data, lengths, dictionary, + sameStripe); + break; + case STRING: + ((StringStreamReader)columnReaders[i]).setBuffers(present, data, lengths, dictionary, + sameStripe); + break; + case DECIMAL: + ((DecimalStreamReader)columnReaders[i]).setBuffers(present, data, secondary); + break; + case TIMESTAMP: + ((TimestampStreamReader)columnReaders[i]).setBuffers(present, data, secondary); + break; + case DATE: + ((DateStreamReader)columnReaders[i]).setBuffers(present, data); + break; + default: + throw new UnsupportedOperationException("Data type not supported yet! " + columnType); + } + + columnReaders[i].seek(StreamUtils.getPositionProvider(rowIndexEntry)); + } + } + private long getRowCount(OrcProto.RowIndexEntry rowIndexEntry) { return rowIndexEntry.getStatistics().getNumberOfValues(); } + } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java new file mode 100644 index 0000000..65cf1c0 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.io.decode; + +import org.apache.hadoop.hive.llap.Consumer; +import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; + +/** + * + */ +public class OrcEncodedDataConsumer extends EncodedDataConsumer { + private ColumnVectorProducer cvp; + private Consumer consumer; + private RecordReaderImpl.TreeReader[] columnReaders; + + public OrcEncodedDataConsumer(ColumnVectorProducer cvp, Consumer consumer, int colCount) { + super(cvp, consumer, colCount); + this.cvp = cvp; + this.consumer = consumer; + this.columnReaders = null; + } + + public void setColumnReaders(RecordReaderImpl.TreeReader[] columnReaders) { + this.columnReaders = columnReaders; + } + + public RecordReaderImpl.TreeReader[] getColumnReaders() { + return columnReaders; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/SettableUncompressedStream.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/SettableUncompressedStream.java new file mode 100644 index 0000000..3b99c50 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/SettableUncompressedStream.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.io.decode.orc.stream; + +import java.util.List; + +import org.apache.hadoop.hive.common.DiskRange; +import org.apache.hadoop.hive.ql.io.orc.InStream; + +/** + * + */ +public class SettableUncompressedStream extends InStream.UncompressedStream { + + public SettableUncompressedStream(String fileName, String name, + List input, long length) { + super(fileName, name, input, length); + } + + public void setBuffers(List input, long length) { + this.bytes = input; + this.length = length; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/StreamUtils.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/StreamUtils.java index 5021141..aeb8e33 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/StreamUtils.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/StreamUtils.java @@ -21,15 +21,14 @@ import java.nio.ByteBuffer; import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; -import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; +import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; import com.google.common.collect.Lists; -import com.google.common.primitives.Longs; /** * Stream utility. @@ -37,36 +36,23 @@ public class StreamUtils { /** - * Create InStream from stream buffer. + * Create LlapInStream from stream buffer. * * @param streamName - stream name * @param fileName - file name - * @param codec - compression codec - * @param bufferSize - compression buffer size * @param streamBuffer - stream buffer - * @return - InStream + * @return - LlapInStream * @throws IOException */ - public static InStream createInStream(String streamName, String fileName, CompressionCodec codec, - int bufferSize, EncodedColumnBatch.StreamBuffer streamBuffer) throws IOException { + public static SettableUncompressedStream createLlapInStream(String streamName, String fileName, + EncodedColumnBatch.StreamBuffer streamBuffer) throws IOException { if (streamBuffer == null) { return null; } - int numBuffers = streamBuffer.cacheBuffers.size(); - List input = Lists.newArrayList(); - List offsetsList = Lists.newArrayList(); - long totalLength = 0; - for (int i = 0; i < numBuffers; i++) { - ByteBuffer data = streamBuffer.cacheBuffers.get(i).byteBuffer.duplicate(); - input.add(data); - // offset start at where previous stream buffer left off - offsetsList.add(totalLength); - totalLength += data.remaining(); - } - ByteBuffer[] buffers = input.toArray(new ByteBuffer[input.size()]); - long[] offsets = Longs.toArray(offsetsList); - return InStream.create(fileName, streamName, buffers, offsets, totalLength, codec, bufferSize); + List diskRanges = Lists.newArrayList(); + long totalLength = createDiskRanges(streamBuffer, diskRanges); + return new SettableUncompressedStream(fileName, streamName, diskRanges, totalLength); } /** @@ -79,4 +65,23 @@ public static PositionProvider getPositionProvider(OrcProto.RowIndexEntry rowInd PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex); return positionProvider; } + + /** + * Converts stream buffers to disk ranges. + * @param streamBuffer - stream buffer + * @param diskRanges - initial empty list of disk ranges + * @return - total length of disk ranges + */ + public static long createDiskRanges(EncodedColumnBatch.StreamBuffer streamBuffer, + List diskRanges) { + long totalLength = 0; + for (LlapMemoryBuffer memoryBuffer : streamBuffer.cacheBuffers) { + ByteBuffer buffer = memoryBuffer.byteBuffer.duplicate(); + RecordReaderImpl.BufferChunk bufferChunk = new RecordReaderImpl.BufferChunk(buffer, + totalLength); + diskRanges.add(bufferChunk); + totalLength += buffer.remaining(); + } + return totalLength; + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BinaryStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BinaryStreamReader.java index 5a636f6..49fdf12 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BinaryStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BinaryStreamReader.java @@ -18,49 +18,84 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + + /** * Stream reader for binary column type. */ public class BinaryStreamReader extends RecordReaderImpl.BinaryTreeReader { - private boolean isFileCompressed; - - private BinaryStreamReader(int columnId, InStream present, - InStream data, InStream length, boolean isFileCompressed, - OrcProto.ColumnEncoding encoding, OrcProto.RowIndexEntry rowIndex) throws IOException { + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _dataStream; + private SettableUncompressedStream _lengthsStream; + + private BinaryStreamReader(int columnId, SettableUncompressedStream present, + SettableUncompressedStream data, SettableUncompressedStream length, boolean isFileCompressed, + OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present, data, length, encoding); - this.isFileCompressed = isFileCompressed; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._dataStream = data; + this._lengthsStream = length; } @Override public void seek(PositionProvider index) throws IOException { if (present != null) { - if (isFileCompressed) { + if (_isFileCompressed) { index.getNext(); } present.seek(index); } - if (isFileCompressed) { - index.getNext(); + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + stream.seek(index); } - stream.seek(index); - if (isFileCompressed) { - index.getNext(); + if (lengths != null && _lengthsStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + lengths.seek(index); + } + } + + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer dataStreamBuffer, + EncodedColumnBatch.StreamBuffer lengthStreamBuffer) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_dataStream != null) { + List dataDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dataStreamBuffer, dataDiskRanges); + _dataStream.setBuffers(dataDiskRanges, length); + } + if (_lengthsStream != null) { + List lengthDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(lengthStreamBuffer, lengthDiskRanges); + _lengthsStream.setBuffers(lengthDiskRanges, length); } - lengths.seek(index); } public static class StreamReaderBuilder { @@ -70,8 +105,6 @@ public void seek(PositionProvider index) throws IOException { private EncodedColumnBatch.StreamBuffer dataStream; private EncodedColumnBatch.StreamBuffer lengthStream; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; private OrcProto.ColumnEncoding columnEncoding; public StreamReaderBuilder setFileName(String fileName) { @@ -104,34 +137,24 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { this.columnEncoding = encoding; return this; } public BinaryStreamReader build() throws IOException { - InStream present = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, - null, bufferSize, presentStream); + SettableUncompressedStream present = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.PRESENT.name(), + fileName, presentStream); - InStream data = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, - null, bufferSize, dataStream); + SettableUncompressedStream data = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.DATA.name(), fileName, + dataStream); - InStream length = StreamUtils.createInStream(OrcProto.Stream.Kind.LENGTH.name(), fileName, - null, bufferSize, lengthStream); + SettableUncompressedStream length = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.LENGTH.name(), + fileName, lengthStream); boolean isFileCompressed = compressionCodec != null; return new BinaryStreamReader(columnIndex, present, data, length, isFileCompressed, - columnEncoding, rowIndex); + columnEncoding); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BooleanStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BooleanStreamReader.java index dfa1176..0daff12 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BooleanStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BooleanStreamReader.java @@ -18,44 +18,67 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + /** * Stream reader for boolean column type. */ public class BooleanStreamReader extends RecordReaderImpl.BooleanTreeReader { - private boolean isFileCompressed; + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _dataStream; - private BooleanStreamReader(int columnId, InStream present, - InStream data, boolean isFileCompressed, - OrcProto.RowIndexEntry rowIndex) throws IOException { + private BooleanStreamReader(int columnId, SettableUncompressedStream present, + SettableUncompressedStream data, boolean isFileCompressed) throws IOException { super(columnId, present, data); - this.isFileCompressed = isFileCompressed; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._dataStream = data; } @Override public void seek(PositionProvider index) throws IOException { if (present != null) { - if (isFileCompressed) { + if (_isFileCompressed) { index.getNext(); } present.seek(index); } - if (isFileCompressed) { - index.getNext(); + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + reader.seek(index); + } + } + + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer dataStreamBuffer) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_dataStream != null) { + List dataDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dataStreamBuffer, dataDiskRanges); + _dataStream.setBuffers(dataDiskRanges, length); } - reader.seek(index); } public static class StreamReaderBuilder { @@ -64,8 +87,6 @@ public void seek(PositionProvider index) throws IOException { private EncodedColumnBatch.StreamBuffer presentStream; private EncodedColumnBatch.StreamBuffer dataStream; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; public StreamReaderBuilder setFileName(String fileName) { this.fileName = fileName; @@ -92,25 +113,15 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public BooleanStreamReader build() throws IOException { - InStream present = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, - null, bufferSize, presentStream); + SettableUncompressedStream present = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.PRESENT.name(), + fileName, presentStream); - InStream data = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, - null, bufferSize, dataStream); + SettableUncompressedStream data = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.DATA.name(), fileName, + dataStream); boolean isFileCompressed = compressionCodec != null; - return new BooleanStreamReader(columnIndex, present, data, isFileCompressed, rowIndex); + return new BooleanStreamReader(columnIndex, present, data, isFileCompressed); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ByteStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ByteStreamReader.java index 6e740e6..6f98fe0 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ByteStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ByteStreamReader.java @@ -18,44 +18,67 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + /** * Stream reader for byte column type. */ public class ByteStreamReader extends RecordReaderImpl.ByteTreeReader { - private boolean isFileCompressed; + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _dataStream; - private ByteStreamReader(int columnId, InStream present, - InStream data, boolean isFileCompressed, - OrcProto.RowIndexEntry rowIndex) throws IOException { + private ByteStreamReader(int columnId, SettableUncompressedStream present, + SettableUncompressedStream data, boolean isFileCompressed) throws IOException { super(columnId, present, data); - this.isFileCompressed = isFileCompressed; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._dataStream = data; } @Override public void seek(PositionProvider index) throws IOException { if (present != null) { - if (isFileCompressed) { + if (_isFileCompressed) { index.getNext(); } present.seek(index); } - if (isFileCompressed) { - index.getNext(); + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + reader.seek(index); + } + } + + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer dataStreamBuffer) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_dataStream != null) { + List dataDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dataStreamBuffer, dataDiskRanges); + _dataStream.setBuffers(dataDiskRanges, length); } - reader.seek(index); } public static class StreamReaderBuilder { @@ -64,8 +87,6 @@ public void seek(PositionProvider index) throws IOException { private EncodedColumnBatch.StreamBuffer presentStream; private EncodedColumnBatch.StreamBuffer dataStream; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; public StreamReaderBuilder setFileName(String fileName) { this.fileName = fileName; @@ -92,25 +113,15 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public ByteStreamReader build() throws IOException { - InStream present = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, - null, bufferSize, presentStream); + SettableUncompressedStream present = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.PRESENT.name(), + fileName, presentStream); - InStream data = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, - null, bufferSize, dataStream); + SettableUncompressedStream data = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.DATA.name(), fileName, + dataStream); boolean isFileCompressed = compressionCodec != null; - return new ByteStreamReader(columnIndex, present, data, isFileCompressed, rowIndex); + return new ByteStreamReader(columnIndex, present, data, isFileCompressed); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/CharacterStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/CharacterStreamReader.java index 61c5008..6ec3e97 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/CharacterStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/CharacterStreamReader.java @@ -18,30 +18,35 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + /** * Stream reader for char and varchar column types. */ public class CharacterStreamReader extends RecordReaderImpl.StringTreeReader { - private boolean isFileCompressed; - private boolean isDictionaryEncoding; + private boolean _isFileCompressed; + private boolean _isDictionaryEncoding; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _dataStream; + private SettableUncompressedStream _lengthStream; + private SettableUncompressedStream _dictionaryStream; private CharacterStreamReader(int columnId, int maxLength, OrcProto.Type charType, - InStream present, - InStream data, InStream length, InStream dictionary, - boolean isFileCompressed, - OrcProto.ColumnEncoding encoding, - OrcProto.RowIndexEntry rowIndex) throws IOException { + SettableUncompressedStream present, SettableUncompressedStream data, SettableUncompressedStream length, SettableUncompressedStream dictionary, + boolean isFileCompressed, OrcProto.ColumnEncoding encoding) throws IOException { super(columnId); - this.isDictionaryEncoding = dictionary != null; + this._isDictionaryEncoding = dictionary != null; if (charType.getKind() == OrcProto.Type.Kind.CHAR) { reader = new RecordReaderImpl.CharTreeReader(columnId, maxLength, present, data, length, dictionary, encoding); @@ -51,37 +56,90 @@ private CharacterStreamReader(int columnId, int maxLength, OrcProto.Type charTyp } else { throw new IOException("Unknown character type " + charType + ". Expected CHAR or VARCHAR."); } - - this.isFileCompressed = isFileCompressed; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._dataStream = data; + this._lengthStream = length; + this._dictionaryStream = dictionary; } @Override public void seek(PositionProvider index) throws IOException { if (present != null) { - if (isFileCompressed) { + if (_isFileCompressed) { index.getNext(); } reader.present.seek(index); } - if (isDictionaryEncoding) { - if (isFileCompressed) { - index.getNext(); + if (_isDictionaryEncoding) { + // DICTIONARY encoding + + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + ((RecordReaderImpl.StringDictionaryTreeReader) reader).reader.seek(index); } - ((RecordReaderImpl.StringDictionaryTreeReader)reader).reader.seek(index); } else { - if (isFileCompressed) { - index.getNext(); + // DIRECT encoding + + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + ((RecordReaderImpl.StringDirectTreeReader) reader).stream.seek(index); } - ((RecordReaderImpl.StringDirectTreeReader)reader).stream.seek(index); - if (isFileCompressed) { - index.getNext(); + if (_lengthStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + ((RecordReaderImpl.StringDirectTreeReader) reader).lengths.seek(index); + } + } + } + + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer dataStreamBuffer, + EncodedColumnBatch.StreamBuffer lengthStreamBuffer, + EncodedColumnBatch.StreamBuffer dictionaryStreamBuffer, + boolean sameStripe) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_dataStream != null) { + List dataDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dataStreamBuffer, dataDiskRanges); + _dataStream.setBuffers(dataDiskRanges, length); + } + if (!_isDictionaryEncoding) { + if (_lengthStream != null) { + List lengthDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(lengthStreamBuffer, lengthDiskRanges); + _lengthStream.setBuffers(lengthDiskRanges, length); + } + } + + // set these streams only if the stripe is different + if (!sameStripe && _isDictionaryEncoding) { + if (_lengthStream != null) { + List lengthDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(lengthStreamBuffer, lengthDiskRanges); + _lengthStream.setBuffers(lengthDiskRanges, length); + } + if (_dictionaryStream != null) { + List dictionaryDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dictionaryStreamBuffer, dictionaryDiskRanges); + _dictionaryStream.setBuffers(dictionaryDiskRanges, length); } - ((RecordReaderImpl.StringDirectTreeReader)reader).lengths.seek(index); } } @@ -95,8 +153,6 @@ public void seek(PositionProvider index) throws IOException { private EncodedColumnBatch.StreamBuffer dictionaryStream; private EncodedColumnBatch.StreamBuffer lengthStream; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; private OrcProto.ColumnEncoding columnEncoding; public StreamReaderBuilder setFileName(String fileName) { @@ -144,37 +200,27 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { this.columnEncoding = encoding; return this; } public CharacterStreamReader build() throws IOException { - InStream present = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, - null, bufferSize, presentStream); + SettableUncompressedStream present = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.PRESENT.name(), + fileName, presentStream); - InStream data = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, - null, bufferSize, dataStream); + SettableUncompressedStream data = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.DATA.name(), fileName, + dataStream); - InStream length = StreamUtils.createInStream(OrcProto.Stream.Kind.LENGTH.name(), fileName, - null, bufferSize, lengthStream); + SettableUncompressedStream length = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.LENGTH.name(), fileName, + lengthStream); - InStream dictionary = StreamUtils.createInStream(OrcProto.Stream.Kind.DICTIONARY_DATA.name(), - fileName, null, bufferSize, dictionaryStream); + SettableUncompressedStream dictionary = StreamUtils.createLlapInStream( + OrcProto.Stream.Kind.DICTIONARY_DATA.name(), fileName, dictionaryStream); boolean isFileCompressed = compressionCodec != null; return new CharacterStreamReader(columnIndex, maxLength, charType, present, data, length, - dictionary, isFileCompressed, columnEncoding, rowIndex); + dictionary, isFileCompressed, columnEncoding); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DateStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DateStreamReader.java index b7f35ea..ff49503 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DateStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DateStreamReader.java @@ -18,30 +18,34 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + /** * Stream reader for date column type. */ public class DateStreamReader extends RecordReaderImpl.DateTreeReader { private boolean isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _dataStream; - private DateStreamReader(int columnId, InStream present, - InStream data, boolean isFileCompressed, - OrcProto.ColumnEncoding encoding, - OrcProto.RowIndexEntry rowIndex) throws IOException { + private DateStreamReader(int columnId, SettableUncompressedStream present, + SettableUncompressedStream data, boolean isFileCompressed, + OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present, data, encoding); this.isFileCompressed = isFileCompressed; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._presentStream = present; + this._dataStream = data; } @Override @@ -53,10 +57,29 @@ public void seek(PositionProvider index) throws IOException { present.seek(index); } - if (isFileCompressed) { - index.getNext(); + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (isFileCompressed) { + index.getNext(); + } + reader.seek(index); + } + } + + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer dataStreamBuffer) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_dataStream != null) { + List dataDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dataStreamBuffer, dataDiskRanges); + _dataStream.setBuffers(dataDiskRanges, length); } - reader.seek(index); } public static class StreamReaderBuilder { @@ -65,8 +88,6 @@ public void seek(PositionProvider index) throws IOException { private EncodedColumnBatch.StreamBuffer presentStream; private EncodedColumnBatch.StreamBuffer dataStream; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; private OrcProto.ColumnEncoding columnEncoding; public StreamReaderBuilder setFileName(String fileName) { @@ -94,32 +115,22 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { this.columnEncoding = encoding; return this; } public DateStreamReader build() throws IOException { - InStream present = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, - null, bufferSize, presentStream); + SettableUncompressedStream present = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.PRESENT.name(), + fileName, presentStream); - InStream data = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, - null, bufferSize, dataStream); + SettableUncompressedStream data = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.DATA.name(), fileName, + dataStream); boolean isFileCompressed = compressionCodec != null; return new DateStreamReader(columnIndex, present, data, isFileCompressed, - columnEncoding, rowIndex); + columnEncoding); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DecimalStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DecimalStreamReader.java index 45ceaa7..6d78caf 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DecimalStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DecimalStreamReader.java @@ -18,49 +18,83 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + /** * Stream reader for decimal column type. */ public class DecimalStreamReader extends RecordReaderImpl.DecimalTreeReader { - private boolean isFileCompressed; - - private DecimalStreamReader(int columnId, int precision, int scale, InStream presentStream, - InStream valueStream, InStream scaleStream, boolean isFileCompressed, - OrcProto.RowIndexEntry rowIndex, OrcProto.ColumnEncoding encoding) throws IOException { + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _valueStream; + private SettableUncompressedStream _scaleStream; + + private DecimalStreamReader(int columnId, int precision, int scale, SettableUncompressedStream presentStream, + SettableUncompressedStream valueStream, SettableUncompressedStream scaleStream, boolean isFileCompressed, + OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, precision, scale, presentStream, valueStream, scaleStream, encoding); - this.isFileCompressed = isFileCompressed; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._isFileCompressed = isFileCompressed; + this._presentStream = presentStream; + this._valueStream = valueStream; + this._scaleStream = scaleStream; } @Override public void seek(PositionProvider index) throws IOException { if (present != null) { - if (isFileCompressed) { + if (_isFileCompressed) { index.getNext(); } present.seek(index); } - if (isFileCompressed) { - index.getNext(); + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_valueStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + value.seek(index); + } + + if (_scaleStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + scaleReader.seek(index); } - valueStream.seek(index); + } - if (isFileCompressed) { - index.getNext(); + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer valueStreamBuffer, + EncodedColumnBatch.StreamBuffer scaleStreamBuffer) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_valueStream != null) { + List valueDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(valueStreamBuffer, valueDiskRanges); + _valueStream.setBuffers(valueDiskRanges, length); + } + if (_scaleStream != null) { + List scaleDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(scaleStreamBuffer, scaleDiskRanges); + _scaleStream.setBuffers(scaleDiskRanges, length); } - scaleReader.seek(index); } public static class StreamReaderBuilder { @@ -72,8 +106,6 @@ public void seek(PositionProvider index) throws IOException { private int scale; private int precision; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; private OrcProto.ColumnEncoding columnEncoding; public StreamReaderBuilder setFileName(String fileName) { @@ -116,34 +148,24 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { this.columnEncoding = encoding; return this; } public DecimalStreamReader build() throws IOException { - InStream presentInStream = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), - fileName, null, bufferSize, presentStream); + SettableUncompressedStream presentInStream = StreamUtils.createLlapInStream( + OrcProto.Stream.Kind.PRESENT.name(), fileName, presentStream); - InStream valueInStream = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), - fileName, null, bufferSize, valueStream); + SettableUncompressedStream valueInStream = StreamUtils.createLlapInStream( + OrcProto.Stream.Kind.DATA.name(), fileName, valueStream); - InStream scaleInStream = StreamUtils.createInStream(OrcProto.Stream.Kind.SECONDARY.name(), - fileName, null, bufferSize, scaleStream); + SettableUncompressedStream scaleInStream = StreamUtils.createLlapInStream( + OrcProto.Stream.Kind.SECONDARY.name(), fileName, scaleStream); boolean isFileCompressed = compressionCodec != null; return new DecimalStreamReader(columnIndex, precision, scale, presentInStream, valueInStream, - scaleInStream, isFileCompressed, rowIndex, columnEncoding); + scaleInStream, isFileCompressed, columnEncoding); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DoubleStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DoubleStreamReader.java index 95edef5..eef9e1f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DoubleStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DoubleStreamReader.java @@ -18,44 +18,67 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + /** * Stream reader for double column type. */ public class DoubleStreamReader extends RecordReaderImpl.DoubleTreeReader { - private boolean isFileCompressed; + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _dataStream; - private DoubleStreamReader(int columnId, InStream present, - InStream data, boolean isFileCompressed, - OrcProto.RowIndexEntry rowIndex) throws IOException { + private DoubleStreamReader(int columnId, SettableUncompressedStream present, + SettableUncompressedStream data, boolean isFileCompressed) throws IOException { super(columnId, present, data); - this.isFileCompressed = isFileCompressed; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._dataStream = data; } @Override public void seek(PositionProvider index) throws IOException { if (present != null) { - if (isFileCompressed) { + if (_isFileCompressed) { index.getNext(); } present.seek(index); } - if (isFileCompressed) { - index.getNext(); + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + stream.seek(index); + } + } + + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer dataStreamBuffer) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_dataStream != null) { + List dataDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dataStreamBuffer, dataDiskRanges); + _dataStream.setBuffers(dataDiskRanges, length); } - stream.seek(index); } public static class StreamReaderBuilder { @@ -64,8 +87,6 @@ public void seek(PositionProvider index) throws IOException { private EncodedColumnBatch.StreamBuffer presentStream; private EncodedColumnBatch.StreamBuffer dataStream; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; public StreamReaderBuilder setFileName(String fileName) { this.fileName = fileName; @@ -92,25 +113,15 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public DoubleStreamReader build() throws IOException { - InStream present = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, - null, bufferSize, presentStream); + SettableUncompressedStream present = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.PRESENT.name(), + fileName, presentStream); - InStream data = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, - null, bufferSize, dataStream); + SettableUncompressedStream data = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.DATA.name(), fileName, + dataStream); boolean isFileCompressed = compressionCodec != null; - return new DoubleStreamReader(columnIndex, present, data, isFileCompressed, rowIndex); + return new DoubleStreamReader(columnIndex, present, data, isFileCompressed); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/FloatStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/FloatStreamReader.java index cc40c8e..8067787 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/FloatStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/FloatStreamReader.java @@ -18,44 +18,67 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + /** * Stream reader for float column type. */ public class FloatStreamReader extends RecordReaderImpl.FloatTreeReader { - private boolean isFileCompressed; + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _dataStream; - private FloatStreamReader(int columnId, InStream present, - InStream data, boolean isFileCompressed, - OrcProto.RowIndexEntry rowIndex) throws IOException { + private FloatStreamReader(int columnId, SettableUncompressedStream present, + SettableUncompressedStream data, boolean isFileCompressed) throws IOException { super(columnId, present, data); - this.isFileCompressed = isFileCompressed; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._dataStream = data; } @Override public void seek(PositionProvider index) throws IOException { if (present != null) { - if (isFileCompressed) { + if (_isFileCompressed) { index.getNext(); } present.seek(index); } - if (isFileCompressed) { - index.getNext(); + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + stream.seek(index); + } + } + + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer dataStreamBuffer) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_dataStream != null) { + List dataDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dataStreamBuffer, dataDiskRanges); + _dataStream.setBuffers(dataDiskRanges, length); } - stream.seek(index); } public static class StreamReaderBuilder { @@ -64,8 +87,6 @@ public void seek(PositionProvider index) throws IOException { private EncodedColumnBatch.StreamBuffer presentStream; private EncodedColumnBatch.StreamBuffer dataStream; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; public StreamReaderBuilder setFileName(String fileName) { this.fileName = fileName; @@ -92,25 +113,15 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public FloatStreamReader build() throws IOException { - InStream present = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, - null, bufferSize, presentStream); + SettableUncompressedStream present = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.PRESENT.name(), + fileName, presentStream); - InStream data = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, - null, bufferSize, dataStream); + SettableUncompressedStream data = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.DATA.name(), fileName, + dataStream); boolean isFileCompressed = compressionCodec != null; - return new FloatStreamReader(columnIndex, present, data, isFileCompressed, rowIndex); + return new FloatStreamReader(columnIndex, present, data, isFileCompressed); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java index 033d801..0c48873 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java @@ -18,60 +18,77 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + + /** * Stream reader for integer column type. */ public class IntStreamReader extends RecordReaderImpl.IntTreeReader { - private boolean isFileCompressed; - private InStream data; + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _dataStream; - private IntStreamReader(int columnId, InStream present, - InStream data, boolean isFileCompressed, - OrcProto.ColumnEncoding encoding, - OrcProto.RowIndexEntry rowIndex) throws IOException { + private IntStreamReader(int columnId, SettableUncompressedStream present, + SettableUncompressedStream data, boolean isFileCompressed, + OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present, data, encoding); - this.isFileCompressed = isFileCompressed; - this.data = data; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._isFileCompressed = isFileCompressed; + this._dataStream = data; + this._presentStream = present; } @Override public void seek(PositionProvider index) throws IOException { if (present != null) { - if (isFileCompressed) { + if (_isFileCompressed) { index.getNext(); } present.seek(index); } - // data stream could be empty stream. This could happen if all values in row group are nulls. - if (reader != null && data.getStreamLength() > 0) { - if (isFileCompressed) { + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (_isFileCompressed) { index.getNext(); } reader.seek(index); } } + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer dataStreamBuffer) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_dataStream != null) { + List dataDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dataStreamBuffer, dataDiskRanges); + _dataStream.setBuffers(dataDiskRanges, length); + } + } + public static class StreamReaderBuilder { private String fileName; private int columnIndex; private EncodedColumnBatch.StreamBuffer presentStream; private EncodedColumnBatch.StreamBuffer dataStream; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; private OrcProto.ColumnEncoding columnEncoding; public StreamReaderBuilder setFileName(String fileName) { @@ -99,31 +116,21 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { this.columnEncoding = encoding; return this; } public IntStreamReader build() throws IOException { - InStream present = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, - null, bufferSize, presentStream); + SettableUncompressedStream present = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.PRESENT.name(), + fileName, presentStream); - InStream data = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, - null, bufferSize, dataStream); + SettableUncompressedStream data = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.DATA.name(), fileName, + dataStream); boolean isFileCompressed = compressionCodec != null; return new IntStreamReader(columnIndex, present, data, isFileCompressed, - columnEncoding, rowIndex); + columnEncoding); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java index fe78e52..d94d4b6 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java @@ -18,45 +18,68 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + /** * Stream reader for long column type. */ public class LongStreamReader extends RecordReaderImpl.LongTreeReader { - private boolean isFileCompressed; + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _dataStream; - private LongStreamReader(int columnId, InStream present, - InStream data, boolean isFileCompressed, - OrcProto.ColumnEncoding encoding, boolean skipCorrupt, - OrcProto.RowIndexEntry rowIndex) throws IOException { + private LongStreamReader(int columnId, SettableUncompressedStream present, + SettableUncompressedStream data, boolean isFileCompressed, + OrcProto.ColumnEncoding encoding, boolean skipCorrupt) throws IOException { super(columnId, present, data, encoding, skipCorrupt); - this.isFileCompressed = isFileCompressed; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._dataStream = data; } @Override public void seek(PositionProvider index) throws IOException { if (present != null) { - if (isFileCompressed) { + if (_isFileCompressed) { index.getNext(); } present.seek(index); } - if (isFileCompressed) { - index.getNext(); + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + reader.seek(index); + } + } + + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer dataStreamBuffer) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_dataStream != null) { + List dataDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dataStreamBuffer, dataDiskRanges); + _dataStream.setBuffers(dataDiskRanges, length); } - reader.seek(index); } public static class StreamReaderBuilder { @@ -65,8 +88,6 @@ public void seek(PositionProvider index) throws IOException { private EncodedColumnBatch.StreamBuffer presentStream; private EncodedColumnBatch.StreamBuffer dataStream; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; private OrcProto.ColumnEncoding columnEncoding; private boolean skipCorrupt; @@ -95,16 +116,6 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { this.columnEncoding = encoding; return this; @@ -116,15 +127,15 @@ public StreamReaderBuilder skipCorrupt(boolean skipCorrupt) { } public LongStreamReader build() throws IOException { - InStream present = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, - null, bufferSize, presentStream); + SettableUncompressedStream present = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.PRESENT.name(), + fileName, presentStream); - InStream data = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, - null, bufferSize, dataStream); + SettableUncompressedStream data = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.DATA.name(), fileName, + dataStream); boolean isFileCompressed = compressionCodec != null; return new LongStreamReader(columnIndex, present, data, isFileCompressed, - columnEncoding, skipCorrupt, rowIndex); + columnEncoding, skipCorrupt); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java index e730595..88d2c0d 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java @@ -18,30 +18,34 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + /** * Stream reader for short column type. */ public class ShortStreamReader extends RecordReaderImpl.ShortTreeReader { private boolean isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _dataStream; - private ShortStreamReader(int columnId, InStream present, - InStream data, boolean isFileCompressed, - OrcProto.ColumnEncoding encoding, - OrcProto.RowIndexEntry rowIndex) throws IOException { + private ShortStreamReader(int columnId, SettableUncompressedStream present, + SettableUncompressedStream data, boolean isFileCompressed, + OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present, data, encoding); this.isFileCompressed = isFileCompressed; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._presentStream = present; + this._dataStream = data; } @Override @@ -53,10 +57,29 @@ public void seek(PositionProvider index) throws IOException { present.seek(index); } - if (isFileCompressed) { - index.getNext(); + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (isFileCompressed) { + index.getNext(); + } + reader.seek(index); + } + } + + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer dataStreamBuffer) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_dataStream != null) { + List dataDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dataStreamBuffer, dataDiskRanges); + _dataStream.setBuffers(dataDiskRanges, length); } - reader.seek(index); } public static class StreamReaderBuilder { @@ -65,8 +88,6 @@ public void seek(PositionProvider index) throws IOException { private EncodedColumnBatch.StreamBuffer presentStream; private EncodedColumnBatch.StreamBuffer dataStream; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; private OrcProto.ColumnEncoding columnEncoding; public StreamReaderBuilder setFileName(String fileName) { @@ -94,31 +115,21 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { this.columnEncoding = encoding; return this; } public ShortStreamReader build() throws IOException { - InStream present = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, - null, bufferSize, presentStream); + SettableUncompressedStream present = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.PRESENT.name(), + fileName, presentStream); - InStream data = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, - null, bufferSize, dataStream); + SettableUncompressedStream data = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.DATA.name(), fileName, + dataStream); boolean isFileCompressed = compressionCodec != null; return new ShortStreamReader(columnIndex, present, data, isFileCompressed, - columnEncoding, rowIndex); + columnEncoding); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/StringStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/StringStreamReader.java index ddb9357..65e3dd6 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/StringStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/StringStreamReader.java @@ -18,59 +18,119 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + /** * Stream reader for string column type. */ public class StringStreamReader extends RecordReaderImpl.StringTreeReader { - private boolean isFileCompressed; - private boolean isDictionaryEncoding; - - private StringStreamReader(int columnId, InStream present, - InStream data, InStream length, InStream dictionary, - boolean isFileCompressed, - OrcProto.ColumnEncoding encoding, - OrcProto.RowIndexEntry rowIndex) throws IOException { + private boolean _isFileCompressed; + private boolean _isDictionaryEncoding; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _dataStream; + private SettableUncompressedStream _lengthStream; + private SettableUncompressedStream _dictionaryStream; + + private StringStreamReader(int columnId, SettableUncompressedStream present, + SettableUncompressedStream data, SettableUncompressedStream length, SettableUncompressedStream dictionary, + boolean isFileCompressed, OrcProto.ColumnEncoding encoding) throws IOException { super(columnId, present, data, length, dictionary, encoding); - this.isDictionaryEncoding = dictionary != null; - this.isFileCompressed = isFileCompressed; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._isDictionaryEncoding = dictionary != null; + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._dataStream = data; + this._lengthStream = length; + this._dictionaryStream = dictionary; } @Override public void seek(PositionProvider index) throws IOException { if (present != null) { - if (isFileCompressed) { + if (_isFileCompressed) { index.getNext(); } reader.present.seek(index); } - if (isDictionaryEncoding) { - if (isFileCompressed) { - index.getNext(); + if (_isDictionaryEncoding) { + // DICTIONARY encoding + + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + ((RecordReaderImpl.StringDictionaryTreeReader) reader).reader.seek(index); } - ((RecordReaderImpl.StringDictionaryTreeReader)reader).reader.seek(index); } else { - if (isFileCompressed) { - index.getNext(); + // DIRECT encoding + + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + ((RecordReaderImpl.StringDirectTreeReader) reader).stream.seek(index); } - ((RecordReaderImpl.StringDirectTreeReader)reader).stream.seek(index); - if (isFileCompressed) { - index.getNext(); + if (_lengthStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + ((RecordReaderImpl.StringDirectTreeReader) reader).lengths.seek(index); + } + } + } + + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer dataStreamBuffer, + EncodedColumnBatch.StreamBuffer lengthStreamBuffer, + EncodedColumnBatch.StreamBuffer dictionaryStreamBuffer, + boolean sameStripe) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_dataStream != null) { + List dataDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dataStreamBuffer, dataDiskRanges); + _dataStream.setBuffers(dataDiskRanges, length); + } + if (!_isDictionaryEncoding) { + if (_lengthStream != null) { + List lengthDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(lengthStreamBuffer, lengthDiskRanges); + _lengthStream.setBuffers(lengthDiskRanges, length); + } + } + + // set these streams only if the stripe is different + if (!sameStripe && _isDictionaryEncoding) { + if (_lengthStream != null) { + List lengthDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(lengthStreamBuffer, lengthDiskRanges); + _lengthStream.setBuffers(lengthDiskRanges, length); + } + if (_dictionaryStream != null) { + List dictionaryDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(dictionaryStreamBuffer, dictionaryDiskRanges); + _dictionaryStream.setBuffers(dictionaryDiskRanges, length); } - ((RecordReaderImpl.StringDirectTreeReader)reader).lengths.seek(index); } } @@ -82,8 +142,6 @@ public void seek(PositionProvider index) throws IOException { private EncodedColumnBatch.StreamBuffer dictionaryStream; private EncodedColumnBatch.StreamBuffer lengthStream; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; private OrcProto.ColumnEncoding columnEncoding; public StreamReaderBuilder setFileName(String fileName) { @@ -121,37 +179,27 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { this.columnEncoding = encoding; return this; } public StringStreamReader build() throws IOException { - InStream present = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, - null, bufferSize, presentStream); + SettableUncompressedStream present = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.PRESENT.name(), + fileName, presentStream); - InStream data = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, - null, bufferSize, dataStream); + SettableUncompressedStream data = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.DATA.name(), fileName, + dataStream); - InStream length = StreamUtils.createInStream(OrcProto.Stream.Kind.LENGTH.name(), fileName, - null, bufferSize, lengthStream); + SettableUncompressedStream length = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.LENGTH.name(), fileName, + lengthStream); - InStream dictionary = StreamUtils.createInStream(OrcProto.Stream.Kind.DICTIONARY_DATA.name(), - fileName, null, bufferSize, dictionaryStream); + SettableUncompressedStream dictionary = StreamUtils.createLlapInStream( + OrcProto.Stream.Kind.DICTIONARY_DATA.name(), fileName, dictionaryStream); boolean isFileCompressed = compressionCodec != null; return new StringStreamReader(columnIndex, present, data, length, dictionary, - isFileCompressed, columnEncoding, rowIndex); + isFileCompressed, columnEncoding); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/TimestampStreamReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/TimestampStreamReader.java index 574e67f..e31be38 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/TimestampStreamReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/TimestampStreamReader.java @@ -18,30 +18,36 @@ package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch; +import org.apache.hadoop.hive.llap.io.decode.orc.stream.SettableUncompressedStream; import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec; -import org.apache.hadoop.hive.ql.io.orc.InStream; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.PositionProvider; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import com.google.common.collect.Lists; + /** * Stream reader for timestamp column type. */ public class TimestampStreamReader extends RecordReaderImpl.TimestampTreeReader { private boolean isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _secondsStream; + private SettableUncompressedStream _nanosStream; - private TimestampStreamReader(int columnId, InStream present, - InStream data, InStream nanos, boolean isFileCompressed, - OrcProto.ColumnEncoding encoding, boolean skipCorrupt, - OrcProto.RowIndexEntry rowIndex) throws IOException { + private TimestampStreamReader(int columnId, SettableUncompressedStream present, + SettableUncompressedStream data, SettableUncompressedStream nanos, boolean isFileCompressed, + OrcProto.ColumnEncoding encoding, boolean skipCorrupt) throws IOException { super(columnId, present, data, nanos, encoding, skipCorrupt); this.isFileCompressed = isFileCompressed; - - // position the readers based on the specified row index - seek(StreamUtils.getPositionProvider(rowIndex)); + this._presentStream = present; + this._secondsStream = data; + this._nanosStream = nanos; } @Override @@ -53,15 +59,42 @@ public void seek(PositionProvider index) throws IOException { present.seek(index); } - if (isFileCompressed) { - index.getNext(); + // data stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_secondsStream.available() > 0) { + if (isFileCompressed) { + index.getNext(); + } + data.seek(index); } - data.seek(index); - if (isFileCompressed) { - index.getNext(); + if (_nanosStream.available() > 0) { + if (isFileCompressed) { + index.getNext(); + } + nanos.seek(index); + } + } + + public void setBuffers(EncodedColumnBatch.StreamBuffer presentStreamBuffer, + EncodedColumnBatch.StreamBuffer secondsStream, + EncodedColumnBatch.StreamBuffer nanosStream) { + long length; + if (_presentStream != null) { + List presentDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(presentStreamBuffer, presentDiskRanges); + _presentStream.setBuffers(presentDiskRanges, length); + } + if (_secondsStream != null) { + List secondsDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(secondsStream, secondsDiskRanges); + _secondsStream.setBuffers(secondsDiskRanges, length); + } + if (_nanosStream != null) { + List nanosDiskRanges = Lists.newArrayList(); + length = StreamUtils.createDiskRanges(nanosStream, nanosDiskRanges); + _nanosStream.setBuffers(nanosDiskRanges, length); } - nanos.seek(index); } public static class StreamReaderBuilder { @@ -71,8 +104,6 @@ public void seek(PositionProvider index) throws IOException { private EncodedColumnBatch.StreamBuffer dataStream; private EncodedColumnBatch.StreamBuffer nanosStream; private CompressionCodec compressionCodec; - private int bufferSize; - private OrcProto.RowIndexEntry rowIndex; private OrcProto.ColumnEncoding columnEncoding; private boolean skipCorrupt; @@ -106,16 +137,6 @@ public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec return this; } - public StreamReaderBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) { - this.rowIndex = rowIndex; - return this; - } - public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { this.columnEncoding = encoding; return this; @@ -127,18 +148,18 @@ public StreamReaderBuilder skipCorrupt(boolean skipCorrupt) { } public TimestampStreamReader build() throws IOException { - InStream present = StreamUtils.createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, - null, bufferSize, presentStream); + SettableUncompressedStream present = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.PRESENT.name(), + fileName, presentStream); - InStream data = StreamUtils.createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, - null, bufferSize, dataStream); + SettableUncompressedStream data = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.DATA.name(), fileName, + dataStream); - InStream nanos = StreamUtils.createInStream(OrcProto.Stream.Kind.SECONDARY.name(), fileName, - null, bufferSize, nanosStream); + SettableUncompressedStream nanos = StreamUtils.createLlapInStream(OrcProto.Stream.Kind.SECONDARY.name(), + fileName, nanosStream); boolean isFileCompressed = compressionCodec != null; return new TimestampStreamReader(columnIndex, present, data, nanos, - isFileCompressed, columnEncoding, skipCorrupt, rowIndex); + isFileCompressed, columnEncoding, skipCorrupt); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java index 6d7d700..ec1f0a9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java @@ -37,6 +37,10 @@ public BitFieldReader(InStream input, mask = (1 << bitSize) - 1; } + public void setInStream(InStream inStream) { + this.input.setInStream(inStream); + } + private void readByte() throws IOException { if (input.hasNext()) { current = 0xff & input.next(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java index 6d6a098..cdf5824 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java @@ -21,7 +21,6 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.ListIterator; @@ -67,9 +66,9 @@ public long getStreamLength() { return length; } - private static class UncompressedStream extends InStream { - private final List bytes; - private final long length; + public static class UncompressedStream extends InStream { + protected List bytes; + protected long length; private long currentOffset; private ByteBuffer range; private int currentRange; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java index adaa413..b1c7f0a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java @@ -62,4 +62,6 @@ */ void nextVector(LongColumnVector previous, long previousLen) throws IOException; + + void setInStream(InStream data); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 9270121..ee59de5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -30,7 +30,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -57,8 +56,8 @@ import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; -import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool; import org.apache.hadoop.hive.ql.io.filters.BloomFilter; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; @@ -80,8 +79,6 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import com.google.common.collect.Lists; - public class RecordReaderImpl implements RecordReader { static final Log LOG = LogFactory.getLog(RecordReaderImpl.class); @@ -279,6 +276,12 @@ public TreeReader(int columnId, InStream in) throws IOException { } } + void setInStream(InStream inStream) { + if (present != null) { + present.setInStream(inStream); + } + } + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) { throw new IOException("Unknown encoding " + encoding + " in column " + @@ -631,6 +634,12 @@ public IntTreeReader(int columnId, InStream present, InStream data, } } + void setInStream(InStream inStream) { + if (reader != null) { + reader.setInStream(inStream); + } + } + @Override void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && @@ -1293,7 +1302,7 @@ void skipRows(long items) throws IOException { } public static class DecimalTreeReader extends TreeReader{ - protected InStream valueStream; + protected InStream value; protected IntegerReader scaleReader = null; private LongColumnVector scratchScaleVector; @@ -1311,7 +1320,7 @@ public DecimalTreeReader(int columnId, int precision, int scale, InStream presen this.precision = precision; this.scale = scale; this.scratchScaleVector = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); - this.valueStream = valueStream; + this.value = valueStream; if (scaleStream != null && encoding != null) { checkEncoding(encoding); this.scaleReader = createIntegerReader(encoding.getKind(), scaleStream, true, false); @@ -1332,7 +1341,7 @@ void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - valueStream = streams.get(new StreamName(columnId, + value = streams.get(new StreamName(columnId, OrcProto.Stream.Kind.DATA)); scaleReader = createIntegerReader(encodings.get(columnId).getKind(), streams.get( new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true, false); @@ -1346,7 +1355,7 @@ void seek(PositionProvider[] index) throws IOException { @Override public void seek(PositionProvider index) throws IOException { super.seek(index); - valueStream.seek(index); + value.seek(index); scaleReader.seek(index); } @@ -1360,7 +1369,7 @@ Object next(Object previous) throws IOException { } else { result = (HiveDecimalWritable) previous; } - result.set(HiveDecimal.create(SerializationUtils.readBigInteger(valueStream), + result.set(HiveDecimal.create(SerializationUtils.readBigInteger(value), (int) scaleReader.next())); return HiveDecimalUtils.enforcePrecisionScale(result, precision, scale); } @@ -1385,7 +1394,7 @@ public Object nextVector(Object previousVector, long batchSize) throws IOExcepti // Read value entries based on isNull entries if (result.isRepeating) { if (!result.isNull[0]) { - BigInteger bInt = SerializationUtils.readBigInteger(valueStream); + BigInteger bInt = SerializationUtils.readBigInteger(value); short scaleInData = (short) scaleReader.next(); HiveDecimal dec = HiveDecimal.create(bInt, scaleInData); dec = HiveDecimalUtils.enforcePrecisionScale(dec, precision, scale); @@ -1397,7 +1406,7 @@ public Object nextVector(Object previousVector, long batchSize) throws IOExcepti scaleReader.nextVector(scratchScaleVector, batchSize); for (int i = 0; i < batchSize; i++) { if (!result.isNull[i]) { - BigInteger bInt = SerializationUtils.readBigInteger(valueStream); + BigInteger bInt = SerializationUtils.readBigInteger(value); short scaleInData = (short) scratchScaleVector.vector[i]; HiveDecimal dec = HiveDecimal.create(bInt, scaleInData); dec = HiveDecimalUtils.enforcePrecisionScale(dec, precision, scale); @@ -1414,7 +1423,7 @@ public Object nextVector(Object previousVector, long batchSize) throws IOExcepti void skipRows(long items) throws IOException { items = countNonNulls(items); for(int i=0; i < items; i++) { - SerializationUtils.readBigInteger(valueStream); + SerializationUtils.readBigInteger(value); } scaleReader.skip(items); } @@ -3045,7 +3054,7 @@ private void readAllDataStreams(StripeInformation stripe) throws IOException { public static class BufferChunk extends DiskRangeList { final ByteBuffer chunk; - BufferChunk(ByteBuffer chunk, long offset) { + public BufferChunk(ByteBuffer chunk, long offset) { super(offset, offset + chunk.remaining()); this.chunk = chunk; } @@ -3114,7 +3123,7 @@ public String toString() { /** * Plan the ranges of the file that we need to read given the list of * columns and row groups. - * @param streamList the list of streams avaiable + * @param streamList the list of streams available * @param indexes the indexes that have been loaded * @param includedColumns which columns are needed * @param includedRowGroups which row groups are needed diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java index 2458dc9..f3e6184 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java @@ -28,7 +28,7 @@ * byte is -1 to -128, 1 to 128 literal byte values follow. */ public class RunLengthByteReader { - private final InStream input; + private InStream input; private final byte[] literals = new byte[RunLengthByteWriter.MAX_LITERAL_SIZE]; private int numLiterals = 0; @@ -39,6 +39,10 @@ this.input = input; } + public void setInStream(InStream input) { + this.input = input; + } + private void readValues(boolean ignoreEof) throws IOException { int control = input.read(); used = 0; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java index cc06b44..9e878f5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java @@ -26,7 +26,7 @@ * A reader that reads a sequence of integers. * */ public class RunLengthIntegerReader implements IntegerReader { - private final InStream input; + private InStream input; private final boolean signed; private final long[] literals = new long[RunLengthIntegerWriter.MAX_LITERAL_SIZE]; @@ -122,6 +122,11 @@ public void nextVector(LongColumnVector previous, long previousLen) throws IOExc } @Override + public void setInStream(InStream data) { + input = data; + } + + @Override public void seek(PositionProvider index) throws IOException { input.seek(index); int consumed = (int) index.getNext(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java index 529e98e..835cb46 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java @@ -35,7 +35,7 @@ public class RunLengthIntegerReaderV2 implements IntegerReader { public static final Log LOG = LogFactory.getLog(RunLengthIntegerReaderV2.class); - private final InStream input; + private InStream input; private final boolean signed; private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE]; private boolean isRepeating = false; @@ -382,4 +382,9 @@ public void nextVector(LongColumnVector previous, long previousLen) throws IOExc } } } + + @Override + public void setInStream(InStream data) { + input = data; + } } diff --git ql/src/test/queries/clientpositive/orc_llap.q ql/src/test/queries/clientpositive/orc_llap.q index 5e912f7..386ed4d 100644 --- ql/src/test/queries/clientpositive/orc_llap.q +++ ql/src/test/queries/clientpositive/orc_llap.q @@ -5,6 +5,7 @@ SET hive.llap.io.enabled=false; SET hive.exec.orc.default.buffer.size=32768; SET hive.exec.orc.default.row.index.stride=1000; SET hive.optimize.index.filter=true; +set hive.auto.convert.join=true; CREATE TABLE orc_llap( ctinyint TINYINT, @@ -31,17 +32,69 @@ insert into table orc_llap select ctinyint + i, csmallint + i, cint + i, cbigint + i, cfloat + i, cdouble + i, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc cross join cross_numbers; -select count(*) from orc_llap; +SET hive.llap.io.enabled=true; +select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp; +SET hive.llap.io.enabled=false; +select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp; + +SET hive.llap.io.enabled=true; +select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) tmp; +SET hive.llap.io.enabled=false; +select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) tmp; SET hive.llap.io.enabled=true; -select sum(hash(*)) from ( -select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp; -select sum(hash(*)) from ( -select * from orc_llap where cint > 10 and cbigint is not null) tmp; -select sum(hash(*)) from ( -select cstring2 from orc_llap where cint > 5 and cint < 10) tmp; -select sum(hash(*)) from ( -select * from orc_llap inner join cross_numbers on csmallint = i) tmp; -select sum(hash(*)) from ( -select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp; +select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) tmp; +SET hive.llap.io.enabled=false; +select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) tmp; + +SET hive.llap.io.enabled=true; +select sum(hash(*)) from (select * from orc_llap inner join cross_numbers on csmallint = i) tmp; +SET hive.llap.io.enabled=false; +select sum(hash(*)) from (select * from orc_llap inner join cross_numbers on csmallint = i) tmp; + +SET hive.llap.io.enabled=true; +select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) tmp; +SET hive.llap.io.enabled=false; +select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) tmp; + +SET hive.llap.io.enabled=true; +select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp; +SET hive.llap.io.enabled=false; +select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp; + +-- multi-stripe test +insert into table orc_llap +select ctinyint + i, csmallint + i, cint + i, cbigint + i, cfloat + i, cdouble + i, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 +from alltypesorc cross join cross_numbers; +alter table orc_llap concatenate; + +SET hive.llap.io.enabled=true; +select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp; +SET hive.llap.io.enabled=false; +select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp; + +SET hive.llap.io.enabled=true; +select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) tmp; +SET hive.llap.io.enabled=false; +select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) tmp; + +SET hive.llap.io.enabled=true; +select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) tmp; +SET hive.llap.io.enabled=false; +select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) tmp; + +SET hive.llap.io.enabled=true; +select sum(hash(*)) from (select * from orc_llap inner join cross_numbers on csmallint = i) tmp; +SET hive.llap.io.enabled=false; +select sum(hash(*)) from (select * from orc_llap inner join cross_numbers on csmallint = i) tmp; + +SET hive.llap.io.enabled=true; +select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) tmp; +SET hive.llap.io.enabled=false; +select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) tmp; + +SET hive.llap.io.enabled=true; +select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp; +SET hive.llap.io.enabled=false; +select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp; diff --git ql/src/test/results/clientpositive/orc_llap.q.out ql/src/test/results/clientpositive/orc_llap.q.out index 123edd1..0a52b6f 100644 --- ql/src/test/results/clientpositive/orc_llap.q.out +++ ql/src/test/results/clientpositive/orc_llap.q.out @@ -53,7 +53,7 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@alltypesorc POSTHOOK: Output: default@cross_numbers POSTHOOK: Lineage: cross_numbers.i EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), ] -Warning: Shuffle Join JOIN[7][tables = [$hdt$_0, $hdt$_1]] in Stage 'Stage-1:MAPRED' is a cross product +Warning: Map Join MAPJOIN[12][bigTable=?] in task 'Stage-4:MAPRED' is a cross product PREHOOK: query: insert into table orc_llap select ctinyint + i, csmallint + i, cint + i, cbigint + i, cfloat + i, cdouble + i, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 from alltypesorc cross join cross_numbers @@ -80,69 +80,264 @@ POSTHOOK: Lineage: orc_llap.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchem POSTHOOK: Lineage: orc_llap.ctimestamp1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ] POSTHOOK: Lineage: orc_llap.ctimestamp2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ] POSTHOOK: Lineage: orc_llap.ctinyint EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), (cross_numbers)cross_numbers.FieldSchema(name:i, type:int, comment:null), ] -PREHOOK: query: select count(*) from orc_llap +PREHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp PREHOOK: type: QUERY PREHOOK: Input: default@orc_llap #### A masked pattern was here #### -POSTHOOK: query: select count(*) from orc_llap +POSTHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_llap #### A masked pattern was here #### -122880 -PREHOOK: query: select sum(hash(*)) from ( -select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp +-558222259686 +PREHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp PREHOOK: type: QUERY PREHOOK: Input: default@orc_llap #### A masked pattern was here #### -POSTHOOK: query: select sum(hash(*)) from ( -select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp +POSTHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_llap #### A masked pattern was here #### -558222259686 -PREHOOK: query: select sum(hash(*)) from ( -select * from orc_llap where cint > 10 and cbigint is not null) tmp +PREHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-197609091139 +PREHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) tmp PREHOOK: type: QUERY PREHOOK: Input: default@orc_llap #### A masked pattern was here #### -POSTHOOK: query: select sum(hash(*)) from ( -select * from orc_llap where cint > 10 and cbigint is not null) tmp +POSTHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) tmp POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_llap #### A masked pattern was here #### -197609091139 -PREHOOK: query: select sum(hash(*)) from ( -select cstring2 from orc_llap where cint > 5 and cint < 10) tmp +PREHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) tmp PREHOOK: type: QUERY PREHOOK: Input: default@orc_llap #### A masked pattern was here #### -POSTHOOK: query: select sum(hash(*)) from ( -select cstring2 from orc_llap where cint > 5 and cint < 10) tmp +POSTHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) tmp POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_llap #### A masked pattern was here #### NULL -PREHOOK: query: select sum(hash(*)) from ( -select * from orc_llap inner join cross_numbers on csmallint = i) tmp +PREHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +NULL +PREHOOK: query: select sum(hash(*)) from (select * from orc_llap inner join cross_numbers on csmallint = i) tmp PREHOOK: type: QUERY PREHOOK: Input: default@cross_numbers PREHOOK: Input: default@orc_llap #### A masked pattern was here #### -POSTHOOK: query: select sum(hash(*)) from ( -select * from orc_llap inner join cross_numbers on csmallint = i) tmp +POSTHOOK: query: select sum(hash(*)) from (select * from orc_llap inner join cross_numbers on csmallint = i) tmp POSTHOOK: type: QUERY POSTHOOK: Input: default@cross_numbers POSTHOOK: Input: default@orc_llap #### A masked pattern was here #### -3966955638 -PREHOOK: query: select sum(hash(*)) from ( -select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp +PREHOOK: query: select sum(hash(*)) from (select * from orc_llap inner join cross_numbers on csmallint = i) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@cross_numbers +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select * from orc_llap inner join cross_numbers on csmallint = i) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cross_numbers +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-3966955638 +PREHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-201218541193 +PREHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-201218541193 +PREHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-735462183586256 +PREHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-735462183586256 +Warning: Map Join MAPJOIN[12][bigTable=?] in task 'Stage-4:MAPRED' is a cross product +PREHOOK: query: -- multi-stripe test +insert into table orc_llap +select ctinyint + i, csmallint + i, cint + i, cbigint + i, cfloat + i, cdouble + i, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 +from alltypesorc cross join cross_numbers +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Input: default@cross_numbers +PREHOOK: Output: default@orc_llap +POSTHOOK: query: -- multi-stripe test +insert into table orc_llap +select ctinyint + i, csmallint + i, cint + i, cbigint + i, cfloat + i, cdouble + i, cstring1, cstring2, ctimestamp1, ctimestamp2, cboolean1, cboolean2 +from alltypesorc cross join cross_numbers +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Input: default@cross_numbers +POSTHOOK: Output: default@orc_llap +POSTHOOK: Lineage: orc_llap.cbigint EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), (cross_numbers)cross_numbers.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap.cboolean1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean1, type:boolean, comment:null), ] +POSTHOOK: Lineage: orc_llap.cboolean2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cboolean2, type:boolean, comment:null), ] +POSTHOOK: Lineage: orc_llap.cdouble EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), (cross_numbers)cross_numbers.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap.cfloat EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), (cross_numbers)cross_numbers.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap.cint EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), (cross_numbers)cross_numbers.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap.csmallint EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:csmallint, type:smallint, comment:null), (cross_numbers)cross_numbers.FieldSchema(name:i, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +POSTHOOK: Lineage: orc_llap.cstring2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring2, type:string, comment:null), ] +POSTHOOK: Lineage: orc_llap.ctimestamp1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp1, type:timestamp, comment:null), ] +POSTHOOK: Lineage: orc_llap.ctimestamp2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:ctimestamp2, type:timestamp, comment:null), ] +POSTHOOK: Lineage: orc_llap.ctinyint EXPRESSION [(alltypesorc)alltypesorc.FieldSchema(name:ctinyint, type:tinyint, comment:null), (cross_numbers)cross_numbers.FieldSchema(name:i, type:int, comment:null), ] +PREHOOK: query: alter table orc_llap concatenate +PREHOOK: type: ALTER_TABLE_MERGE +PREHOOK: Input: default@orc_llap +PREHOOK: Output: default@orc_llap +POSTHOOK: query: alter table orc_llap concatenate +POSTHOOK: type: ALTER_TABLE_MERGE +POSTHOOK: Input: default@orc_llap +POSTHOOK: Output: default@orc_llap +PREHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-1116444519372 +PREHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select cint, csmallint, cbigint from orc_llap where cint > 10 and cbigint is not null) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-1116444519372 +PREHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-395218182278 +PREHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select * from orc_llap where cint > 10 and cbigint is not null) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-395218182278 +PREHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +NULL +PREHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select cstring2 from orc_llap where cint > 5 and cint < 10) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +NULL +PREHOOK: query: select sum(hash(*)) from (select * from orc_llap inner join cross_numbers on csmallint = i) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@cross_numbers +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select * from orc_llap inner join cross_numbers on csmallint = i) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cross_numbers +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-7933911276 +PREHOOK: query: select sum(hash(*)) from (select * from orc_llap inner join cross_numbers on csmallint = i) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@cross_numbers +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select * from orc_llap inner join cross_numbers on csmallint = i) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@cross_numbers +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-7933911276 +PREHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-201218541193 +PREHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select cstring1, cstring2, count(*) from orc_llap group by cstring1, cstring2) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-201218418313 +PREHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-2941848734345024 +PREHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp PREHOOK: type: QUERY PREHOOK: Input: default@orc_llap #### A masked pattern was here #### -POSTHOOK: query: select sum(hash(*)) from ( -select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp +POSTHOOK: query: select sum(hash(*)) from (select o1.cstring1, o2.cstring2 from orc_llap o1 inner join orc_llap o2 on o1.csmallint = o2.csmallint where o1.cbigint is not null and o2.cbigint is not null) tmp POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_llap #### A masked pattern was here #### -252929559084752 +-2941848734345024