diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 3dfab63..92d48f6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -6,9 +6,7 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,7 +16,6 @@ package org.apache.hadoop.hive.llap.io.decode; import java.io.IOException; -import java.util.List; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; @@ -28,16 +25,6 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.CompressionCodec; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; @@ -46,12 +33,16 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.TreeReaderFactory; import org.apache.hadoop.hive.ql.io.orc.WriterImpl; import org.apache.orc.OrcProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class OrcEncodedDataConsumer - extends EncodedDataConsumer { + extends EncodedDataConsumer { + public static final Logger LOG = LoggerFactory.getLogger(OrcEncodedDataConsumer.class); private TreeReaderFactory.TreeReader[] columnReaders; private int previousStripeIndex = -1; private OrcFileMetadata fileMetadata; // We assume one request is only for one file. @@ -81,58 +72,6 @@ public void setStripeMetadata(OrcStripeMetadata m) { stripes[m.getStripeIx()] = m; } - private static ColumnVector createColumn(List types, - final int columnId, int batchSize) { - OrcProto.Type type = types.get(columnId); - switch (type.getKind()) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - case DATE: - return new LongColumnVector(batchSize); - case FLOAT: - case DOUBLE: - return new DoubleColumnVector(batchSize); - case BINARY: - case STRING: - case CHAR: - case VARCHAR: - return new BytesColumnVector(batchSize); - case TIMESTAMP: - return new TimestampColumnVector(batchSize); - case DECIMAL: - return new DecimalColumnVector(batchSize, type.getPrecision(), - type.getScale()); - case STRUCT: { - List subtypeIdxs = type.getSubtypesList(); - ColumnVector[] fieldVector = new ColumnVector[subtypeIdxs.size()]; - for(int i=0; i < fieldVector.length; ++i) { - fieldVector[i] = createColumn(types, subtypeIdxs.get(i), batchSize); - } - return new StructColumnVector(batchSize, fieldVector); - } - case UNION: { - List subtypeIdxs = type.getSubtypesList(); - ColumnVector[] fieldVector = new ColumnVector[subtypeIdxs.size()]; - for(int i=0; i < fieldVector.length; ++i) { - fieldVector[i] = createColumn(types, subtypeIdxs.get(i), batchSize); - } - return new UnionColumnVector(batchSize, fieldVector); - } - case LIST: - return new ListColumnVector(batchSize, createColumn(types, type.getSubtypes(0), batchSize)); - case MAP: - return new MapColumnVector(batchSize, - createColumn(types, type.getSubtypes(0), batchSize), - createColumn(types, type.getSubtypes(1), batchSize)); - default: - throw new IllegalArgumentException("LLAP does not support " + - type.getKind()); - } - } - @Override protected void decodeBatch(OrcEncodedColumnBatch batch, Consumer downstreamConsumer) { @@ -154,13 +93,15 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, } int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1); int batchSize = VectorizedRowBatch.DEFAULT_SIZE; - int numCols = batch.getColumnIxs().length; + TypeDescription schema = fileMetadata.getSchema(); if (columnReaders == null || !sameStripe) { - this.columnReaders = EncodedTreeReaderFactory.createEncodedTreeReader(numCols, - fileMetadata.getTypes(), stripeMetadata.getEncodings(), batch, codec, skipCorrupt); - positionInStreams(columnReaders, batch, numCols, stripeMetadata); + EncodedTreeReaderFactory.StructTreeReader treeReader = + (EncodedTreeReaderFactory.StructTreeReader) EncodedTreeReaderFactory.createEncodedTreeReader(schema, + stripeMetadata.getEncodings(), batch, codec, skipCorrupt); + this.columnReaders = treeReader.getChildReaders(); + positionInStreams(columnReaders, batch, stripeMetadata); } else { - repositionInStreams(this.columnReaders, batch, sameStripe, numCols, stripeMetadata); + repositionInStreams(this.columnReaders, batch, sameStripe, stripeMetadata); } previousStripeIndex = currentStripeIndex; @@ -168,20 +109,16 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, // for last batch in row group, adjust the batch size if (i == maxBatchesRG - 1) { batchSize = (int) (nonNullRowCount % VectorizedRowBatch.DEFAULT_SIZE); - if (batchSize == 0) break; + if (batchSize == 0) { + break; + } } ColumnVectorBatch cvb = cvbPool.take(); assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split. cvb.size = batchSize; - List types = fileMetadata.getTypes(); - int[] columnMapping = batch.getColumnIxs(); - for (int idx = 0; idx < batch.getColumnIxs().length; idx++) { - if (cvb.cols[idx] == null) { - // Orc store rows inside a root struct (hive writes it this way). - // When we populate column vectors we skip over the root struct. - cvb.cols[idx] = createColumn(types, columnMapping[idx], batchSize); - } + cvb.cols = schema.createRowBatch(batchSize).cols; + for (int idx = 0; idx < cvb.cols.length; idx++) { cvb.cols[idx].ensureSize(batchSize, false); columnReaders[idx].nextVector(cvb.cols[idx], null, batchSize); } @@ -200,9 +137,9 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, } private void positionInStreams(TreeReaderFactory.TreeReader[] columnReaders, - EncodedColumnBatch batch, int numCols, + EncodedColumnBatch batch, OrcStripeMetadata stripeMetadata) throws IOException { - for (int i = 0; i < numCols; i++) { + for (int i = 0; i < columnReaders.length; i++) { int columnIndex = batch.getColumnIxs()[i]; int rowGroupIndex = batch.getBatchKey().rgIx; OrcProto.RowIndex rowIndex = stripeMetadata.getRowIndexes()[columnIndex]; @@ -212,15 +149,15 @@ private void positionInStreams(TreeReaderFactory.TreeReader[] columnReaders, } private void repositionInStreams(TreeReaderFactory.TreeReader[] columnReaders, - EncodedColumnBatch batch, boolean sameStripe, int numCols, + EncodedColumnBatch batch, boolean sameStripe, OrcStripeMetadata stripeMetadata) throws IOException { - for (int i = 0; i < numCols; i++) { + for (int i = 0; i < columnReaders.length; i++) { int columnIndex = batch.getColumnIxs()[i]; int rowGroupIndex = batch.getBatchKey().rgIx; ColumnStreamData[] streamBuffers = batch.getColumnData()[i]; OrcProto.RowIndex rowIndex = stripeMetadata.getRowIndexes()[columnIndex]; OrcProto.RowIndexEntry rowIndexEntry = rowIndex.getEntry(rowGroupIndex); - ((SettableTreeReader)columnReaders[i]).setBuffers(streamBuffers, sameStripe); + ((SettableTreeReader) columnReaders[i]).setBuffers(streamBuffers, sameStripe); columnReaders[i].seek(new RecordReaderImpl.PositionProviderImpl(rowIndexEntry)); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 1befba7..a9bb57e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -26,9 +26,10 @@ import java.util.List; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; -import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.DataReaderProperties; import org.apache.orc.impl.OrcIndex; +import org.apache.orc.impl.SchemaEvolution; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -134,7 +135,7 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private final BufferUsageManager bufferManager; private final Configuration conf; private final FileSplit split; - private List columnIds; + private List includedColumnIds; private final SearchArgument sarg; private final String[] columnNames; private final OrcEncodedDataConsumer consumer; @@ -168,9 +169,9 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff this.bufferManager = bufferManager; this.conf = conf; this.split = split; - this.columnIds = columnIds; - if (this.columnIds != null) { - Collections.sort(this.columnIds); + this.includedColumnIds = columnIds; + if (this.includedColumnIds != null) { + Collections.sort(this.includedColumnIds); } this.sarg = sarg; this.columnNames = columnNames; @@ -232,8 +233,8 @@ protected Void performDataRead() throws IOException { fileMetadata = getOrReadFileMetadata(); consumer.setFileMetadata(fileMetadata); validateFileMetadata(); - if (columnIds == null) { - columnIds = createColumnIds(fileMetadata); + if (includedColumnIds == null) { + includedColumnIds = getAllColumnIds(fileMetadata); } // 2. Determine which stripes to read based on the split. @@ -257,7 +258,8 @@ protected Void performDataRead() throws IOException { boolean[] globalIncludes = null; boolean[] sargColumns = null; try { - globalIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), columnIds, true); + globalIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), + includedColumnIds, true); if (sarg != null && stride != 0) { // TODO: move this to a common method int[] filterColumns = RecordReaderImpl.mapSargColumnsToOrcInternalColIdx( @@ -307,7 +309,6 @@ protected Void performDataRead() throws IOException { // Reader creating updates HDFS counters, don't do it here. DataWrapperForOrc dw = new DataWrapperForOrc(); stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY); - stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); } catch (Throwable t) { consumer.setError(t); recordReaderTime(startTime); @@ -327,7 +328,6 @@ protected Void performDataRead() throws IOException { } int stripeIx = stripeIxFrom + stripeIxMod; boolean[][] colRgs = null; - boolean[] stripeIncludes = null; OrcStripeMetadata stripeMetadata = null; StripeInformation stripe; try { @@ -338,6 +338,7 @@ protected Void performDataRead() throws IOException { LlapIoImpl.ORC_LOGGER.trace("Reading stripe {}: {}, {}", stripeIx, stripe.getOffset(), stripe.getLength()); colRgs = readState[stripeIxMod]; + LOG.info("readState[{}]: {}", stripeIxMod, Arrays.toString(colRgs)); // We assume that NO_RGS value is only set from SARG filter and for all columns; // intermediate changes for individual columns will unset values in the array. // Skip this case for 0-column read. We could probably special-case it just like we do @@ -345,17 +346,6 @@ protected Void performDataRead() throws IOException { if (colRgs.length > 0 && colRgs[0] == RecordReaderImpl.SargApplier.READ_NO_RGS) continue; - // 6.1. Determine the columns to read (usually the same as requested). - if (cols == null || cols.size() == colRgs.length) { - cols = columnIds; - stripeIncludes = globalIncludes; - } else { - // We are reading subset of the original columns, remove unnecessary bitmasks/etc. - // This will never happen w/o high-level cache. - stripeIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), cols, true); - colRgs = genStripeColRgs(cols, colRgs); - } - // 6.2. Ensure we have stripe metadata. We might have read it before for RG filtering. boolean isFoundInCache = false; if (stripeMetadatas != null) { @@ -371,27 +361,27 @@ protected Void performDataRead() throws IOException { ensureMetadataReader(); long startTimeHdfs = counters.startTimeCounter(); stripeMetadata = new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0), - metadataReader, stripe, stripeIncludes, sargColumns); + metadataReader, stripe, globalIncludes, sargColumns); counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTimeHdfs); if (hasFileId && metadataCache != null) { stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata); if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { LlapIoImpl.ORC_LOGGER.trace("Caching stripe {} metadata with includes: {}", - stripeKey.stripeIx, DebugUtils.toString(stripeIncludes)); + stripeKey.stripeIx, DebugUtils.toString(globalIncludes)); } } } consumer.setStripeMetadata(stripeMetadata); } - if (!stripeMetadata.hasAllIndexes(stripeIncludes)) { + if (!stripeMetadata.hasAllIndexes(globalIncludes)) { if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) { LlapIoImpl.ORC_LOGGER.trace("Updating indexes in stripe {} metadata for includes: {}", - stripeKey.stripeIx, DebugUtils.toString(stripeIncludes)); + stripeKey.stripeIx, DebugUtils.toString(globalIncludes)); } assert isFoundInCache; counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS); ensureMetadataReader(); - updateLoadedIndexes(stripeMetadata, stripe, stripeIncludes, sargColumns); + updateLoadedIndexes(stripeMetadata, stripe, globalIncludes, sargColumns); } else if (isFoundInCache) { counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT); } @@ -415,7 +405,7 @@ protected Void performDataRead() throws IOException { // Also, currently readEncodedColumns is not stoppable. The consumer will discard the // data it receives for one stripe. We could probably interrupt it, if it checked that. stripeReader.readEncodedColumns(stripeIx, stripe, stripeMetadata.getRowIndexes(), - stripeMetadata.getEncodings(), stripeMetadata.getStreams(), stripeIncludes, + stripeMetadata.getEncodings(), stripeMetadata.getStreams(), globalIncludes, colRgs, consumer); } catch (Throwable t) { consumer.setError(t); @@ -526,9 +516,11 @@ private static Object determineFileId(FileSystem fs, FileSplit split, /** * Puts all column indexes from metadata to make a column list to read all column. */ - private static List createColumnIds(OrcFileMetadata metadata) { - List columnIds = new ArrayList(metadata.getTypes().size()); - for (int i = 1; i < metadata.getTypes().size(); ++i) { + private static List getAllColumnIds(OrcFileMetadata metadata) { + int rootColumn = OrcInputFormat.getRootColumn(true); + List types = metadata.getTypes().get(rootColumn).getSubtypesList(); + List columnIds = new ArrayList(types.size()); + for (int i = 0; i < types.size(); ++i) { columnIds.add(i); } return columnIds; @@ -732,8 +724,8 @@ private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride, } } assert isAll || isNone || rgsToRead.length == rgCount; - readState[stripeIxMod] = new boolean[columnIds.size()][]; - for (int j = 0; j < columnIds.size(); ++j) { + readState[stripeIxMod] = new boolean[globalIncludes.length][]; + for (int j = 0; j < globalIncludes.length; ++j) { readState[stripeIxMod][j] = (isAll || isNone) ? rgsToRead : Arrays.copyOf(rgsToRead, rgsToRead.length); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java index c9b0a4d..70cba05 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java @@ -32,7 +32,9 @@ import org.apache.orc.CompressionKind; import org.apache.orc.FileMetadata; import org.apache.orc.OrcProto; +import org.apache.orc.OrcUtils; import org.apache.orc.StripeInformation; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.ReaderImpl; /** ORC file metadata. Currently contains some duplicate info due to how different parts @@ -222,4 +224,8 @@ public long getNumberOfRows() { public List getFileStats() { return fileStats; } + + public TypeDescription getSchema() { + return OrcUtils.convertTypeFromProtobuf(this.types, 0); + } } diff --git a/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java index c4a2093..a048dba 100644 --- a/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java +++ b/orc/src/java/org/apache/orc/impl/TreeReaderFactory.java @@ -1699,7 +1699,7 @@ public void nextVector(ColumnVector previousVector, } } - protected static class StructTreeReader extends TreeReader { + public static class StructTreeReader extends TreeReader { protected final TreeReader[] fields; protected StructTreeReader(int columnId, @@ -1717,6 +1717,19 @@ protected StructTreeReader(int columnId, } } + public TreeReader[] getChildReaders() { + return fields; + } + + protected StructTreeReader(int columnId, InStream present, + OrcProto.ColumnEncoding encoding, TreeReader[] childReaders) throws IOException { + super(columnId, present); + if (encoding != null) { + checkEncoding(encoding); + } + this.fields = childReaders; + } + @Override void seek(PositionProvider[] index) throws IOException { super.seek(index); @@ -1875,6 +1888,16 @@ protected ListTreeReader(int fileColumn, skipCorrupt); } + protected ListTreeReader(int columnId, InStream present, InStream data, + OrcProto.ColumnEncoding encoding, TreeReader elementReader) throws IOException { + super(columnId, present); + if (data != null && encoding != null) { + checkEncoding(encoding); + this.lengths = createIntegerReader(encoding.getKind(), data, false, false); + } + this.elementReader = elementReader; + } + @Override void seek(PositionProvider[] index) throws IOException { super.seek(index); @@ -1956,6 +1979,18 @@ protected MapTreeReader(int fileColumn, valueReader = createTreeReader(valueType, evolution, included, skipCorrupt); } + protected MapTreeReader(int columnId, InStream present, InStream data, + OrcProto.ColumnEncoding encoding, TreeReader keyReader, TreeReader valueReader) + throws IOException { + super(columnId, present); + if (data != null && encoding != null) { + checkEncoding(encoding); + this.lengths = createIntegerReader(encoding.getKind(), data, false, false); + } + this.keyReader = keyReader; + this.valueReader = valueReader; + } + @Override void seek(PositionProvider[] index) throws IOException { super.seek(index); diff --git a/pom.xml b/pom.xml index 63a5ae1..b690c47 100644 --- a/pom.xml +++ b/pom.xml @@ -173,7 +173,7 @@ 1.0.1 1.7.10 4.0.4 - 0.8.3 + 0.8.4-SNAPSHOT 0.90.2-incubating 2.2.0 1.6.0 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 69d58d6..ebc5a2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -279,7 +279,7 @@ public SerDeStats getStats() { * @param isOriginal is the file in the original format? * @return the column number for the root of row. */ - static int getRootColumn(boolean isOriginal) { + public static int getRootColumn(boolean isOriginal) { return isOriginal ? 0 : (OrcRecordUpdater.ROW + 1); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java index 4d09dcd..1ff3503 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java @@ -49,10 +49,4 @@ void readEncodedColumns(int stripeIx, StripeInformation stripe, */ void close() throws IOException; - /** - * Controls the low-level debug tracing. (Hopefully) allows for optimization where tracing - * checks are entirely eliminated because this method is called with constant value, similar - * to just checking the constant in the first place. - */ - void setTracing(boolean isEnabled); } \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index dad35e3..54e8028 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.slf4j.Logger; @@ -78,6 +79,7 @@ */ class EncodedReaderImpl implements EncodedReader { public static final Logger LOG = LoggerFactory.getLogger(EncodedReaderImpl.class); + public static final Logger ORC_LOGGER = LoggerFactory.getLogger("LlapIoOrc"); private static final Object POOLS_CREATION_LOCK = new Object(); private static Pools POOLS; private static class Pools { @@ -102,7 +104,7 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final List types; private final long rowIndexStride; private final DataCache cacheWrapper; - private boolean isTracingEnabled; + private boolean isTracingEnabled = ORC_LOGGER.isTraceEnabled(); public EncodedReaderImpl(Object fileKey, List types, CompressionCodec codec, int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader, @@ -208,13 +210,13 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, // 1.1. Figure out which columns have a present stream boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types); if (isTracingEnabled) { - LOG.trace("The following columns have PRESENT streams: " + arrayToString(hasNull)); + ORC_LOGGER.trace("The following columns have PRESENT streams: " + arrayToString(hasNull)); } // We assume stream list is sorted by column and that non-data // streams do not interleave data streams for the same column. // 1.2. With that in mind, determine disk ranges to read/get from cache (not by stream). - int colRgIx = -1, lastColIx = -1; + int lastColIx = -1; ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length]; boolean[] includedRgs = null; boolean isCompressed = (codec != null); @@ -227,39 +229,42 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, if (!included[colIx] || StreamName.getArea(streamKind) != StreamName.Area.DATA) { // We have a stream for included column, but in future it might have no data streams. // It's more like "has at least one column included that has an index stream". - hasIndexOnlyCols = hasIndexOnlyCols | included[colIx]; + hasIndexOnlyCols = hasIndexOnlyCols || included[colIx]; if (isTracingEnabled) { - LOG.trace("Skipping stream: " + streamKind + " at " + offset + ", " + length); + ORC_LOGGER.trace("Skipping stream: " + streamKind + " at " + offset + ", " + length); } offset += length; continue; } ColumnReadContext ctx = null; if (lastColIx != colIx) { - ++colRgIx; - assert colCtxs[colRgIx] == null; + assert colCtxs[colIx] == null; lastColIx = colIx; - includedRgs = colRgs[colRgIx]; - ctx = colCtxs[colRgIx] = new ColumnReadContext( + LOG.info("1) lastColIx: {} colIx: {} colRgIx: {} colRgs[{}]: {}", lastColIx, colIx, colIx, + colIx, Arrays.toString(colRgs[colIx])); + includedRgs = colRgs[colIx]; + ctx = colCtxs[colIx] = new ColumnReadContext( colIx, encodings.get(colIx), indexes[colIx]); if (isTracingEnabled) { - LOG.trace("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString()); + ORC_LOGGER.trace("Creating context " + colIx + " for column " + colIx + ":" + ctx.toString()); } } else { - ctx = colCtxs[colRgIx]; + ctx = colCtxs[colIx]; + LOG.info("2) lastColIx: {} colIx: {} colRgIx: {} colRgs[{}]: {}", lastColIx, colIx, colIx, + colIx, Arrays.toString(colRgs[colIx])); assert ctx != null; } int indexIx = RecordReaderUtils.getIndexPosition(ctx.encoding.getKind(), types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]); ctx.addStream(offset, stream, indexIx); if (isTracingEnabled) { - LOG.trace("Adding stream for column " + colIx + ": " + streamKind + " at " + offset + ORC_LOGGER.trace("Adding stream for column " + colIx + ": " + streamKind + " at " + offset + ", " + length + ", index position " + indexIx); } if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(colIx))) { RecordReaderUtils.addEntireStreamToRanges(offset, length, listToRead, true); if (isTracingEnabled) { - LOG.trace("Will read whole stream " + streamKind + "; added to " + listToRead.getTail()); + ORC_LOGGER.trace("Will read whole stream " + streamKind + "; added to " + listToRead.getTail()); } } else { RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRgs, @@ -269,6 +274,14 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, offset += length; } + // handle null data streams + for (int i = 0; i < lastColIx; i++) { + if (colCtxs[i] == null) { + LOG.info("Creating column read context for column id: {}", i); + colCtxs[i] = new ColumnReadContext(i, encodings.get(i), indexes[i]); + } + } + boolean hasFileId = this.fileKey != null; if (listToRead.get() == null) { // No data to read for this stripe. Check if we have some included index-only columns. @@ -286,14 +299,14 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, // 2. Now, read all of the ranges from cache or disk. DiskRangeList.MutateHelper toRead = new DiskRangeList.MutateHelper(listToRead.get()); if (isTracingEnabled && LOG.isInfoEnabled()) { - LOG.trace("Resulting disk ranges to read (file " + fileKey + "): " + ORC_LOGGER.trace("Resulting disk ranges to read (file " + fileKey + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); } BooleanRef isAllInCache = new BooleanRef(); if (hasFileId) { cacheWrapper.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY, isAllInCache); if (isTracingEnabled && LOG.isInfoEnabled()) { - LOG.trace("Disk ranges after cache (file " + fileKey + ", base offset " + stripeOffset + ORC_LOGGER.trace("Disk ranges after cache (file " + fileKey + ", base offset " + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); } } @@ -321,7 +334,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, } } if (isTracingEnabled) { - LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base offset " + ORC_LOGGER.trace("Disk ranges after pre-read (file " + fileKey + ", base offset " + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); } iter = toRead.next; // Reset the iter to start. @@ -341,9 +354,14 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) { // RG x col filtered. isRGSelected = false; + LOG.info("colIxMod: {} rgIx: {} colRgs[{}]: {} colRgs[{}][{}]: {}", colIxMod, rgIx, colIxMod, + Arrays.toString(colRgs[colIxMod]), colIxMod, rgIx, colRgs[colIxMod][rgIx]); continue; } ColumnReadContext ctx = colCtxs[colIxMod]; + if (ctx == null) continue; // no data streams present for this column + LOG.info("ctx: {} rgIx: {} isLastRg: {} rgCount: {} colIxMod: {}", ctx, + rgIx, isLastRg, rgCount, colIxMod); OrcProto.RowIndexEntry index = ctx.rowIndex.getEntry(rgIx), nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1); ecb.initColumn(colIxMod, ctx.colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS); @@ -353,7 +371,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) { // This stream is for entire stripe and needed for every RG; uncompress once and reuse. if (isTracingEnabled) { - LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for" + ORC_LOGGER.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for" + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length); } if (sctx.stripeLevelStream == null) { @@ -410,7 +428,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, } if (isTracingEnabled) { - LOG.trace("Disk ranges after preparing all the data " + ORC_LOGGER.trace("Disk ranges after preparing all the data " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); } @@ -436,7 +454,7 @@ private ColumnStreamData createRgColumnStreamData(int rgIx, boolean isLastRg, ColumnStreamData cb = POOLS.csdPool.take(); cb.incRef(); if (isTracingEnabled) { - LOG.trace("Getting data for column "+ colIx + " " + (isLastRg ? "last " : "") + ORC_LOGGER.trace("Getting data for column "+ colIx + " " + (isLastRg ? "last " : "") + "RG " + rgIx + " stream " + sctx.kind + " at " + sctx.offset + ", " + sctx.length + " index position " + sctx.streamIndexOffset + ": " + (isCompressed ? "" : "un") + "compressed [" + cOffset + ", " + endCOffset + ")"); @@ -458,12 +476,6 @@ private void releaseInitialRefcounts(DiskRangeList current) { } @Override - public void setTracing(boolean isEnabled) { - this.isTracingEnabled = isEnabled; - } - - - @Override public void close() throws IOException { dataReader.close(); } @@ -606,7 +618,7 @@ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, lon // to return to a previous block. DiskRangeList current = findExactPosition(start, cOffset); if (isTracingEnabled) { - LOG.trace("Starting read for [" + cOffset + "," + endCOffset + ") at " + current); + ORC_LOGGER.trace("Starting read for [" + cOffset + "," + endCOffset + ") at " + current); } CacheChunk lastUncompressed = null; @@ -654,7 +666,7 @@ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, lon chunk.originalData = null; if (isTracingEnabled) { - LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)"); + ORC_LOGGER.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)"); } cacheWrapper.reuseBuffer(chunk.getBuffer()); } @@ -699,13 +711,13 @@ private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset, // 2a. This is a decoded compression buffer, add as is. CacheChunk cc = (CacheChunk)current; if (isTracingEnabled) { - LOG.trace("Locking " + cc.getBuffer() + " due to reuse"); + ORC_LOGGER.trace("Locking " + cc.getBuffer() + " due to reuse"); } cacheWrapper.reuseBuffer(cc.getBuffer()); columnStreamData.getCacheBuffers().add(cc.getBuffer()); currentOffset = cc.getEnd(); if (isTracingEnabled) { - LOG.trace("Adding an already-uncompressed buffer " + cc.getBuffer()); + ORC_LOGGER.trace("Adding an already-uncompressed buffer " + cc.getBuffer()); } ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, cc); lastUncompressed = cc; @@ -713,7 +725,7 @@ private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset, } else if (current instanceof IncompleteCb) { // 2b. This is a known incomplete CB caused by ORC CB end boundaries being estimates. if (isTracingEnabled) { - LOG.trace("Cannot read " + current); + ORC_LOGGER.trace("Cannot read " + current); } next = null; currentOffset = -1; @@ -747,7 +759,7 @@ private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffse assert current instanceof CacheChunk; lastUncompressed = (CacheChunk)current; if (isTracingEnabled) { - LOG.trace("Locking " + lastUncompressed.getBuffer() + " due to reuse"); + ORC_LOGGER.trace("Locking " + lastUncompressed.getBuffer() + " due to reuse"); } cacheWrapper.reuseBuffer(lastUncompressed.getBuffer()); if (isFirst) { @@ -757,7 +769,7 @@ private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffse columnStreamData.getCacheBuffers().add(lastUncompressed.getBuffer()); currentOffset = lastUncompressed.getEnd(); if (isTracingEnabled) { - LOG.trace("Adding an uncompressed buffer " + lastUncompressed.getBuffer()); + ORC_LOGGER.trace("Adding an uncompressed buffer " + lastUncompressed.getBuffer()); } ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, lastUncompressed); next = current.next; @@ -787,7 +799,7 @@ private DiskRangeList preReadUncompressedStream(long baseOffset, // 1. Find our bearings in the stream. DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd); if (isTracingEnabled) { - LOG.trace("Starting pre-read for [" + streamOffset + "," + streamEnd + ") at " + current); + ORC_LOGGER.trace("Starting pre-read for [" + streamOffset + "," + streamEnd + ") at " + current); } if (streamOffset > current.getOffset()) { @@ -843,7 +855,7 @@ private DiskRangeList preReadUncompressedStream(long baseOffset, wasSplit = true; } if (isTracingEnabled) { - LOG.trace("Processing uncompressed file data at [" + ORC_LOGGER.trace("Processing uncompressed file data at [" + current.getOffset() + ", " + current.getEnd() + ")"); } BufferChunk curBc = (BufferChunk)current; @@ -1065,7 +1077,7 @@ private void ponderReleaseInitialRefcount( private void releaseInitialRefcount(CacheChunk cc, boolean isBacktracking) { // This is the last RG for which this buffer will be used. Remove the initial refcount if (isTracingEnabled) { - LOG.trace("Unlocking " + cc.getBuffer() + " for the fetching thread" + ORC_LOGGER.trace("Unlocking " + cc.getBuffer() + " for the fetching thread" + (isBacktracking ? "; backtracking" : "")); } cacheWrapper.releaseBuffer(cc.getBuffer()); @@ -1088,7 +1100,7 @@ private void processCacheCollisions(long[] collisionMask, CacheChunk replacedChunk = toDecompress.get(i); MemoryBuffer replacementBuffer = targetBuffers[i]; if (isTracingEnabled) { - LOG.trace("Discarding data due to cache collision: " + replacedChunk.getBuffer() + ORC_LOGGER.trace("Discarding data due to cache collision: " + replacedChunk.getBuffer() + " replaced with " + replacementBuffer); } assert replacedChunk.getBuffer() != replacementBuffer : i + " was not replaced in the results " @@ -1164,7 +1176,7 @@ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current, long cbEndOffset = cbStartOffset + consumedLength; boolean isUncompressed = ((b0 & 0x01) == 1); if (isTracingEnabled) { - LOG.trace("Found CB at " + cbStartOffset + ", chunk length " + chunkLength + ", total " + ORC_LOGGER.trace("Found CB at " + cbStartOffset + ", chunk length " + chunkLength + ", total " + consumedLength + ", " + (isUncompressed ? "not " : "") + "compressed"); } if (compressed.remaining() >= chunkLength) { @@ -1190,7 +1202,7 @@ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current, int originalPos = compressed.position(); copy.put(compressed); if (isTracingEnabled) { - LOG.trace("Removing partial CB " + current + " from ranges after copying its contents"); + ORC_LOGGER.trace("Removing partial CB " + current + " from ranges after copying its contents"); } DiskRangeList next = current.next; current.removeSelf(); @@ -1230,7 +1242,7 @@ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current, next = next.hasContiguousNext() ? next.next : null; if (next != null) { if (isTracingEnabled) { - LOG.trace("Removing partial CB " + tmp + " from ranges after copying its contents"); + ORC_LOGGER.trace("Removing partial CB " + tmp + " from ranges after copying its contents"); } tmp.removeSelf(); } else { @@ -1244,7 +1256,7 @@ private IncompleteCb addIncompleteCompressionBuffer( long cbStartOffset, DiskRangeList target, int extraChunkCount) { IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd()); if (isTracingEnabled) { - LOG.trace("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with " + ORC_LOGGER.trace("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with " + icb + " in the buffers"); } target.replaceSelfWith(icb); @@ -1277,19 +1289,19 @@ private ProcCacheChunk addOneCompressionBlockByteBuffer(ByteBuffer fullCompressi toDecompress.add(cc); // Adjust the compression block position. if (isTracingEnabled) { - LOG.trace("Adjusting " + lastChunk + " to consume " + lastChunkLength + " compressed bytes"); + ORC_LOGGER.trace("Adjusting " + lastChunk + " to consume " + lastChunkLength + " compressed bytes"); } lastChunk.getChunk().position(lastChunk.getChunk().position() + lastChunkLength); // Finally, put it in the ranges list for future use (if shared between RGs). // Before anyone else accesses it, it would have been allocated and decompressed locally. if (lastChunk.getChunk().remaining() <= 0) { if (isTracingEnabled) { - LOG.trace("Replacing " + lastChunk + " with " + cc + " in the buffers"); + ORC_LOGGER.trace("Replacing " + lastChunk + " with " + cc + " in the buffers"); } lastChunk.replaceSelfWith(cc); } else { if (isTracingEnabled) { - LOG.trace("Adding " + cc + " before " + lastChunk + " in the buffers"); + ORC_LOGGER.trace("Adding " + cc + " before " + lastChunk + " in the buffers"); } lastChunk.insertPartBefore(cc); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java index b44da06..6646a93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java @@ -18,17 +18,23 @@ package org.apache.hadoop.hive.ql.io.orc.encoded; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; import org.apache.orc.CompressionCodec; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.PositionProvider; +import org.apache.orc.impl.SchemaEvolution; import org.apache.orc.impl.SettableUncompressedStream; import org.apache.orc.impl.TreeReaderFactory; import org.apache.orc.OrcProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class EncodedTreeReaderFactory extends TreeReaderFactory { + public static final Logger LOG = LoggerFactory.getLogger(EncodedTreeReaderFactory.class); /** * We choose to use a toy programming language, so we cannot use multiple inheritance. * If we could, we could have this inherit TreeReader to contain the common impl, and then @@ -200,7 +206,7 @@ public void seek(PositionProvider index) throws IOException { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. - if (_dataStream.available() > 0) { + if (_dataStream != null && _dataStream.available() > 0) { if (_isFileCompressed) { index.getNext(); } @@ -211,14 +217,14 @@ public void seek(PositionProvider index) throws IOException { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. - if (_dataStream.available() > 0) { + if (_dataStream != null && _dataStream.available() > 0) { if (_isFileCompressed) { index.getNext(); } ((StringDirectTreeReader) reader).getStream().seek(index); } - if (_lengthStream.available() > 0) { + if (_lengthStream != null && _lengthStream.available() > 0) { if (_isFileCompressed) { index.getNext(); } @@ -1662,16 +1668,335 @@ public static StreamReaderBuilder builder() { } } - public static TreeReader[] createEncodedTreeReader(int numCols, - List types, + protected static class ListStreamReader extends ListTreeReader implements SettableTreeReader{ + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _lengthStream; + + public ListStreamReader(final int columnIndex, + final SettableUncompressedStream present, final SettableUncompressedStream lengthStream, + final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed, + final TreeReader elementReader) throws IOException { + super(columnIndex, present, lengthStream, columnEncoding, elementReader); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._lengthStream = lengthStream; + } + + @Override + public void seek(PositionProvider index) throws IOException { + if (present != null) { + if (_isFileCompressed) { + index.getNext(); + } + present.seek(index); + } + + // lengths stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_lengthStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + elementReader.seek(index); + } + } + + @Override + public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + throws IOException { + if (_presentStream != null) { + _presentStream.setBuffers( + StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); + } + if (_lengthStream != null) { + _lengthStream.setBuffers( + StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.LENGTH_VALUE])); + } + + if (elementReader != null) { + ((SettableTreeReader) elementReader).setBuffers(streamsData, sameStripe); + } + } + + public static class StreamReaderBuilder { + private int columnIndex; + private ColumnStreamData presentStream; + private ColumnStreamData lengthStream; + private CompressionCodec compressionCodec; + private OrcProto.ColumnEncoding columnEncoding; + private TreeReader elementReader; + + + public ListStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) { + this.columnIndex = columnIndex; + return this; + } + + public ListStreamReader.StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) { + this.lengthStream = lengthStream; + return this; + } + + public ListStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) { + this.presentStream = presentStream; + return this; + } + + public ListStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { + this.columnEncoding = encoding; + return this; + } + + public ListStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) { + this.compressionCodec = compressionCodec; + return this; + } + + public ListStreamReader.StreamReaderBuilder setElementReader(TreeReader elementReader) { + this.elementReader = elementReader; + return this; + } + + public ListStreamReader build() throws IOException { + SettableUncompressedStream present = StreamUtils + .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(), + presentStream); + + SettableUncompressedStream length = StreamUtils + .createSettableUncompressedStream(OrcProto.Stream.Kind.LENGTH.name(), + lengthStream); + + boolean isFileCompressed = compressionCodec != null; + return new ListStreamReader(columnIndex, present, length, columnEncoding, isFileCompressed, + elementReader); + } + } + + public static ListStreamReader.StreamReaderBuilder builder() { + return new ListStreamReader.StreamReaderBuilder(); + } + } + + protected static class MapStreamReader extends MapTreeReader implements SettableTreeReader{ + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _lengthStream; + + public MapStreamReader(final int columnIndex, + final SettableUncompressedStream present, final SettableUncompressedStream lengthStream, + final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed, + final TreeReader keyReader, final TreeReader valueReader) throws IOException { + super(columnIndex, present, lengthStream, columnEncoding, keyReader, valueReader); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._lengthStream = lengthStream; + } + + @Override + public void seek(PositionProvider index) throws IOException { + if (present != null) { + if (_isFileCompressed) { + index.getNext(); + } + present.seek(index); + } + + // lengths stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_lengthStream.available() > 0) { + if (_isFileCompressed) { + index.getNext(); + } + keyReader.seek(index); + valueReader.seek(index); + } + } + + @Override + public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + throws IOException { + if (_presentStream != null) { + _presentStream.setBuffers( + StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); + } + if (_lengthStream != null) { + _lengthStream.setBuffers( + StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.LENGTH_VALUE])); + } + + if (keyReader != null) { + ((SettableTreeReader) keyReader).setBuffers(streamsData, sameStripe); + } + + if (valueReader != null) { + ((SettableTreeReader) valueReader).setBuffers(streamsData, sameStripe); + } + } + + public static class StreamReaderBuilder { + private int columnIndex; + private ColumnStreamData presentStream; + private ColumnStreamData lengthStream; + private CompressionCodec compressionCodec; + private OrcProto.ColumnEncoding columnEncoding; + private TreeReader keyReader; + private TreeReader valueReader; + + + public MapStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) { + this.columnIndex = columnIndex; + return this; + } + + public MapStreamReader.StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) { + this.lengthStream = lengthStream; + return this; + } + + public MapStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) { + this.presentStream = presentStream; + return this; + } + + public MapStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { + this.columnEncoding = encoding; + return this; + } + + public MapStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) { + this.compressionCodec = compressionCodec; + return this; + } + + public MapStreamReader.StreamReaderBuilder setKeyReader(TreeReader keyReader) { + this.keyReader = keyReader; + return this; + } + + public MapStreamReader.StreamReaderBuilder setValueReader(TreeReader valueReader) { + this.valueReader = valueReader; + return this; + } + + public MapStreamReader build() throws IOException { + SettableUncompressedStream present = StreamUtils + .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(), + presentStream); + + SettableUncompressedStream length = StreamUtils + .createSettableUncompressedStream(OrcProto.Stream.Kind.LENGTH.name(), + lengthStream); + + boolean isFileCompressed = compressionCodec != null; + return new MapStreamReader(columnIndex, present, length, columnEncoding, isFileCompressed, + keyReader, valueReader); + } + } + + public static MapStreamReader.StreamReaderBuilder builder() { + return new MapStreamReader.StreamReaderBuilder(); + } + } + + protected static class StructStreamReader extends StructTreeReader implements SettableTreeReader{ + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + + public StructStreamReader(final int columnIndex, + final SettableUncompressedStream present, + final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed, + final TreeReader[] childReaders) throws IOException { + super(columnIndex, present, columnEncoding, childReaders); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + } + + @Override + public void seek(PositionProvider index) throws IOException { + if (present != null) { + if (_isFileCompressed) { + index.getNext(); + } + present.seek(index); + } + if (fields != null) { + for (TreeReader child : fields) { + child.seek(index); + } + } + } + + @Override + public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + throws IOException { + if (_presentStream != null) { + _presentStream.setBuffers( + StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); + } + if (fields != null) { + for (TreeReader child : fields) { + ((SettableTreeReader) child).setBuffers(streamsData, sameStripe); + } + } + } + + public static class StreamReaderBuilder { + private int columnIndex; + private ColumnStreamData presentStream; + private CompressionCodec compressionCodec; + private OrcProto.ColumnEncoding columnEncoding; + private TreeReader[] childReaders; + + + public StructStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) { + this.columnIndex = columnIndex; + return this; + } + + public StructStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) { + this.presentStream = presentStream; + return this; + } + + public StructStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { + this.columnEncoding = encoding; + return this; + } + + public StructStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) { + this.compressionCodec = compressionCodec; + return this; + } + + public StructStreamReader.StreamReaderBuilder setChildReaders(TreeReader[] childReaders) { + this.childReaders = childReaders; + return this; + } + + public StructStreamReader build() throws IOException { + SettableUncompressedStream present = StreamUtils + .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(), + presentStream); + + boolean isFileCompressed = compressionCodec != null; + return new StructStreamReader(columnIndex, present, columnEncoding, isFileCompressed, + childReaders); + } + } + + public static StructStreamReader.StreamReaderBuilder builder() { + return new StructStreamReader.StreamReaderBuilder(); + } + } + + public static TreeReader createEncodedTreeReader(TypeDescription schema, List encodings, EncodedColumnBatch batch, CompressionCodec codec, boolean skipCorrupt) throws IOException { - TreeReader[] treeReaders = new TreeReader[numCols]; - for (int i = 0; i < numCols; i++) { - int columnIndex = batch.getColumnIxs()[i]; - ColumnStreamData[] streamBuffers = batch.getColumnData()[i]; - OrcProto.Type columnType = types.get(columnIndex); + int columnIndex = schema.getId(); + ColumnStreamData[] streamBuffers = batch.getColumnData()[columnIndex]; + TypeDescription columnType = schema.getChildren().get(columnIndex); // EncodedColumnBatch is already decompressed, we don't really need to pass codec. // But we need to know if the original data is compressed or not. This is used to skip @@ -1688,149 +2013,219 @@ public static StreamReaderBuilder builder() { lengths = streamBuffers[OrcProto.Stream.Kind.LENGTH_VALUE], secondary = streamBuffers[OrcProto.Stream.Kind.SECONDARY_VALUE]; - switch (columnType.getKind()) { + LOG.info("numCols: {} columnIxs: {} columnIndex: {} columnType: {} streamBuffers.length: {}" + + " columnEncoding: {} present: {} data: {} dictionary: {} lengths: {} secondary: {}", + schema.getMaximumId(), Arrays.toString(batch.getColumnIxs()), columnIndex, columnType, + streamBuffers.length, columnEncoding, present != null, + data != null, dictionary != null, lengths != null, secondary != null); + switch (columnType.getCategory()) { case BINARY: - treeReaders[i] = BinaryStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setLengthStream(lengths) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .build(); - break; case BOOLEAN: - treeReaders[i] = BooleanStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .build(); - break; case BYTE: - treeReaders[i] = ByteStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .build(); - break; case SHORT: - treeReaders[i] = ShortStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .build(); - break; case INT: - treeReaders[i] = IntStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .build(); - break; case LONG: - treeReaders[i] = LongStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .skipCorrupt(skipCorrupt) - .build(); - break; case FLOAT: - treeReaders[i] = FloatStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .build(); - break; case DOUBLE: - treeReaders[i] = DoubleStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .build(); - break; case CHAR: - treeReaders[i] = CharStreamReader.builder() - .setColumnIndex(columnIndex) - .setMaxLength(columnType.getMaximumLength()) - .setPresentStream(present) - .setDataStream(data) - .setLengthStream(lengths) - .setDictionaryStream(dictionary) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .build(); - break; case VARCHAR: - treeReaders[i] = VarcharStreamReader.builder() - .setColumnIndex(columnIndex) - .setMaxLength(columnType.getMaximumLength()) - .setPresentStream(present) - .setDataStream(data) - .setLengthStream(lengths) - .setDictionaryStream(dictionary) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .build(); - break; case STRING: - treeReaders[i] = StringStreamReader.builder() + case DECIMAL: + case TIMESTAMP: + case DATE: + return getPrimitiveTreeReaders(columnIndex, columnType, codec, columnEncoding, + present, data, dictionary, lengths, secondary, skipCorrupt); + case LIST: + TypeDescription elementType = schema.getChildren().get(columnIndex).getChildren().get(0); + TreeReader elementReader = createEncodedTreeReader(elementType, encodings, batch, codec, + skipCorrupt); + return ListStreamReader.builder() .setColumnIndex(columnIndex) + .setColumnEncoding(columnEncoding) + .setCompressionCodec(codec) .setPresentStream(present) - .setDataStream(data) .setLengthStream(lengths) - .setDictionaryStream(dictionary) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) + .setElementReader(elementReader) .build(); - break; - case DECIMAL: - treeReaders[i] = DecimalStreamReader.builder() + case MAP: + TypeDescription mapType = schema.getChildren().get(columnIndex); + TypeDescription keyType = mapType.getChildren().get(0); + TypeDescription valueType = mapType.getChildren().get(1); + TreeReader keyReader = createEncodedTreeReader(keyType, encodings, batch, codec, + skipCorrupt); + TreeReader valueReader = createEncodedTreeReader(valueType, encodings, batch, codec, + skipCorrupt); + return MapStreamReader.builder() .setColumnIndex(columnIndex) - .setPrecision(columnType.getPrecision()) - .setScale(columnType.getScale()) - .setPresentStream(present) - .setValueStream(data) - .setScaleStream(secondary) - .setCompressionCodec(codec) .setColumnEncoding(columnEncoding) - .build(); - break; - case TIMESTAMP: - treeReaders[i] = TimestampStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setSecondsStream(data) - .setNanosStream(secondary) .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .skipCorrupt(skipCorrupt) + .setPresentStream(present) + .setLengthStream(lengths) + .setKeyReader(keyReader) + .setValueReader(valueReader) .build(); - break; - case DATE: - treeReaders[i] = DateStreamReader.builder() + case STRUCT: + int childCount = schema.getChildren().size(); + TreeReader[] childReaders = new TreeReader[childCount]; + for (int i = 0; i < childCount; i++) { + TypeDescription childType = schema.getChildren().get(i); + childReaders[i] = createEncodedTreeReader(childType, encodings, batch, codec, + skipCorrupt); + } + return StructStreamReader.builder() .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) .setCompressionCodec(codec) .setColumnEncoding(columnEncoding) + .setPresentStream(present) + .setChildReaders(childReaders) .build(); + case UNION: + // FIXME: +// treeReaders[i] = UnionStreamReader.builder() +// .setColumnIndex(columnIndex) +// .setCompressionCodec(codec) +// .setColumnEncoding(columnEncoding) +// .setPresentStream(present) +// .build(); break; default: throw new UnsupportedOperationException("Data type not supported yet! " + columnType); } - } - return treeReaders; + return null; + } + + private static TreeReader getPrimitiveTreeReaders(final int columnIndex, + final TypeDescription columnType, final CompressionCodec codec, + final OrcProto.ColumnEncoding columnEncoding, + final ColumnStreamData present, final ColumnStreamData data, + final ColumnStreamData dictionary, final ColumnStreamData lengths, + final ColumnStreamData secondary, final boolean skipCorrupt) throws IOException { + switch (columnType.getCategory()) { + case BINARY: + return BinaryStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setLengthStream(lengths) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case BOOLEAN: + return BooleanStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .build(); + case BYTE: + return ByteStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .build(); + case SHORT: + return ShortStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case INT: + return IntStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case LONG: + return LongStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .skipCorrupt(skipCorrupt) + .build(); + case FLOAT: + return FloatStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .build(); + case DOUBLE: + return DoubleStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .build(); + case CHAR: + return CharStreamReader.builder() + .setColumnIndex(columnIndex) + .setMaxLength(columnType.getMaxLength()) + .setPresentStream(present) + .setDataStream(data) + .setLengthStream(lengths) + .setDictionaryStream(dictionary) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case VARCHAR: + return VarcharStreamReader.builder() + .setColumnIndex(columnIndex) + .setMaxLength(columnType.getMaxLength()) + .setPresentStream(present) + .setDataStream(data) + .setLengthStream(lengths) + .setDictionaryStream(dictionary) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case STRING: + return StringStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setLengthStream(lengths) + .setDictionaryStream(dictionary) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case DECIMAL: + return DecimalStreamReader.builder() + .setColumnIndex(columnIndex) + .setPrecision(columnType.getPrecision()) + .setScale(columnType.getScale()) + .setPresentStream(present) + .setValueStream(data) + .setScaleStream(secondary) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case TIMESTAMP: + return TimestampStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setSecondsStream(data) + .setNanosStream(secondary) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .skipCorrupt(skipCorrupt) + .build(); + case DATE: + return DateStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + } + return null; } } diff --git a/ql/src/test/queries/clientpositive/vector_complex_all.q b/ql/src/test/queries/clientpositive/vector_complex_all.q index 1f23b60..e1efe23 100644 --- a/ql/src/test/queries/clientpositive/vector_complex_all.q +++ b/ql/src/test/queries/clientpositive/vector_complex_all.q @@ -2,6 +2,7 @@ set hive.cli.print.header=true; set hive.explain.user=false; set hive.fetch.task.conversion=none; SET hive.vectorized.execution.enabled=true; +set hive.llap.io.enabled=false; CREATE TABLE orc_create_staging ( str STRING, @@ -24,20 +25,24 @@ CREATE TABLE orc_create_complex ( INSERT OVERWRITE TABLE orc_create_complex SELECT * FROM orc_create_staging; +set hive.llap.io.enabled=true; + -- Since complex types are not supported, this query should not vectorize. EXPLAIN SELECT * FROM orc_create_complex; SELECT * FROM orc_create_complex; --- However, since this query is not referencing the complex fields, it should vectorize. -EXPLAIN -SELECT COUNT(*) FROM orc_create_complex; +SELECT str FROM orc_create_complex; -SELECT COUNT(*) FROM orc_create_complex; +SELECT strct, mp, lst FROM orc_create_complex; --- Also, since this query is not referencing the complex fields, it should vectorize. -EXPLAIN -SELECT str FROM orc_create_complex ORDER BY str; +SELECT lst, str FROM orc_create_complex; + +SELECT mp, str FROM orc_create_complex; + +SELECT strct, str FROM orc_create_complex; + +SELECT strct.B, str FROM orc_create_complex; -SELECT str FROM orc_create_complex ORDER BY str; \ No newline at end of file +-- TODO: add multi-stripe test. also test complex types with some nulls so that present stream is not suppressed.