diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 2776fe95f1..544c83672d 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -576,6 +576,7 @@ minillaplocal.query.files=\ lineage2.q,\ lineage3.q,\ list_bucket_dml_10.q,\ + llap_acid2.q,\ llap_partitioned.q,\ llap_vector_nohybridgrace.q,\ load_data_acid_rename.q,\ diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java index 9262bf0ce3..19b0b55c56 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java @@ -43,4 +43,46 @@ public void swapColumnVector(int ix, ColumnVector[] other, int otherIx) { other[otherIx] = cols[ix]; cols[ix] = old; } + + + @Override + public String toString() { + if (size == 0) { + return ""; + } + StringBuilder b = new StringBuilder(); + b.append("Column vector types: "); + for (int k = 0; k < cols.length; k++) { + ColumnVector cv = cols[k]; + b.append(k); + b.append(":"); + b.append(cv == null ? "null" : cv.getClass().getSimpleName().replace("ColumnVector", "")); + } + b.append('\n'); + + + for (int i = 0; i < size; i++) { + b.append('['); + for (int k = 0; k < cols.length; k++) { + ColumnVector cv = cols[k]; + if (k > 0) { + b.append(", "); + } + if (cv == null) continue; + if (cv != null) { + try { + cv.stringifyValue(b, i); + } catch (Exception ex) { + b.append("invalid"); + } + } + } + b.append(']'); + if (i < size - 1) { + b.append('\n'); + } + } + + return b.toString(); + } } \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index bb319f0d4a..6d29163fbf 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -100,9 +100,12 @@ FileSplit fileSplit = (FileSplit) split; reporter.setStatus(fileSplit.toString()); try { - List includedCols = ColumnProjectionUtils.isReadAllColumns(job) + // At this entry point, we are going to assume that these are logical table columns. + // Perhaps we should go thru the code and clean this up to be more explicit; for now, we + // will start with this single assumption and maintain clear semantics from here. + List tableIncludedCols = ColumnProjectionUtils.isReadAllColumns(job) ? null : ColumnProjectionUtils.getReadColumnIDs(job); - LlapRecordReader rr = LlapRecordReader.create(job, fileSplit, includedCols, hostName, + LlapRecordReader rr = LlapRecordReader.create(job, fileSplit, tableIncludedCols, hostName, cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf); if (rr == null) { // Reader-specific incompatibility like SMB or schema evolution. @@ -111,7 +114,7 @@ // For non-vectorized operator case, wrap the reader if possible. RecordReader result = rr; if (!Utilities.getIsVectorized(job)) { - result = wrapLlapReader(includedCols, rr, split); + result = wrapLlapReader(tableIncludedCols, rr, split); if (result == null) { // Cannot wrap a reader for non-vectorized pipeline. return sourceInputFormat.getRecordReader(split, job, reporter); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index a69c9a023c..3a2c19a3e6 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool; import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory; import org.apache.hadoop.hive.llap.io.decode.ReadPipeline; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -44,15 +46,16 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.io.orc.OrcSplit; import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; +import org.apache.hadoop.hive.ql.io.orc.encoded.Reader; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; @@ -70,17 +73,18 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; class LlapRecordReader implements RecordReader, Consumer { + private static final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class); private static final Object DONE_OBJECT = new Object(); private final FileSplit split; - private List columnIds; + private final IncludesImpl includes; private final SearchArgument sarg; - private final String[] columnNames; private final VectorizedRowBatchCtx rbCtx; private final Object[] partitionValues; @@ -100,21 +104,19 @@ private final JobConf jobConf; private final ReadPipeline rp; private final ExecutorService executor; - private final int columnCount; private final boolean isAcidScan; /** * Creates the record reader and checks the input-specific compatibility. * @return The reader if the split can be read, null otherwise. */ - public static LlapRecordReader create(JobConf job, FileSplit split, List includedCols, - String hostName, ColumnVectorProducer cvp, ExecutorService executor, - InputFormat sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, - Configuration daemonConf) - throws IOException, HiveException { + public static LlapRecordReader create(JobConf job, FileSplit split, + List tableIncludedCols, String hostName, ColumnVectorProducer cvp, + ExecutorService executor, InputFormat sourceInputFormat, Deserializer sourceSerDe, + Reporter reporter, Configuration daemonConf) throws IOException, HiveException { MapWork mapWork = findMapWork(job); if (mapWork == null) return null; // No compatible MapWork. - LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, includedCols, hostName, + LlapRecordReader rr = new LlapRecordReader(mapWork, job, split, tableIncludedCols, hostName, cvp, executor, sourceInputFormat, sourceSerDe, reporter, daemonConf); if (!rr.checkOrcSchemaEvolution()) { rr.close(); @@ -124,7 +126,7 @@ public static LlapRecordReader create(JobConf job, FileSplit split, List includedCols, String hostName, ColumnVectorProducer cvp, + List tableIncludedCols, String hostName, ColumnVectorProducer cvp, ExecutorService executor, InputFormat sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, Configuration daemonConf) throws IOException, HiveException { this.executor = executor; @@ -132,7 +134,6 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, this.split = split; this.sarg = ConvertAstToSearchArg.createFromConf(job); - this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); final String fragmentId = LlapTezUtils.getFragmentId(job); final String dagId = LlapTezUtils.getDagId(job); final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID); @@ -152,32 +153,11 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx(); rbCtx = ctx != null ? ctx : LlapInputFormat.createFakeVrbCtx(mapWork); - // Note: columnIds below makes additional changes for ACID. Don't use this var directly. - if (includedCols == null) { - // Assume including everything means the VRB will have everything. - includedCols = new ArrayList<>(rbCtx.getRowColumnTypeInfos().length); - for (int i = 0; i < rbCtx.getRowColumnTypeInfos().length; ++i) { - includedCols.add(i); - } - } - isAcidScan = AcidUtils.isFullAcidScan(jobConf); TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr( job, isAcidScan, Integer.MAX_VALUE); - if (isAcidScan) { - this.columnIds = new ArrayList<>(); - final int ACID_FIELDS = OrcInputFormat.getRootColumn(false); - for (int i = 0; i < ACID_FIELDS; i++) { - columnIds.add(i); - } - for (int i = 0; i < includedCols.size(); i++) { - columnIds.add(i + ACID_FIELDS); - } - this.columnCount = columnIds.size(); - } else { - this.columnIds = includedCols; - this.columnCount = columnIds.size(); - } + + this.includes = new IncludesImpl(tableIncludedCols, isAcidScan, rbCtx, schema, job); int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE, job, daemonConf); int queueLimitMin = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, daemonConf); @@ -195,9 +175,8 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, } // Create the consumer of encoded data; it will coordinate decoding to CVBs. - feedback = rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, - counters, schema, sourceInputFormat, sourceSerDe, reporter, job, - mapWork.getPathToPartitionInfo()); + feedback = rp = cvp.createReadPipeline(this, split, includes, sarg, counters, includes, + sourceInputFormat, sourceSerDe, reporter, job, mapWork.getPathToPartitionInfo()); } private static int getQueueVar(ConfVars var, JobConf jobConf, Configuration daemonConf) { @@ -287,11 +266,11 @@ public void start() { private boolean checkOrcSchemaEvolution() { SchemaEvolution evolution = rp.getSchemaEvolution(); - for (int i = 0; i < columnCount; ++i) { - int projectedColId = columnIds == null ? i : columnIds.get(i); + // TODO: should this just use physical IDs? + for (int i = 0; i < includes.getReaderLogicalColumnIds().size(); ++i) { + int projectedColId = includes.getReaderLogicalColumnIds().get(i); // Adjust file column index for ORC struct. - // LLAP IO does not support ACID. When it supports, this would be auto adjusted. - int fileColId = OrcInputFormat.getRootColumn(!isAcidScan) + projectedColId + 1; + int fileColId = OrcInputFormat.getRootColumn(!isAcidScan) + projectedColId + 1; if (!evolution.isPPDSafeConversion(fileColId)) { LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", split); return false; @@ -301,8 +280,8 @@ private boolean checkOrcSchemaEvolution() { } @Override - public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { - assert value != null; + public boolean next(NullWritable key, VectorizedRowBatch vrb) throws IOException { + assert vrb != null; if (isClosed) { throw new AssertionError("next called after close"); } @@ -310,7 +289,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti boolean wasFirst = isFirst; if (isFirst) { if (partitionValues != null) { - rbCtx.addPartitionColsToBatch(value, partitionValues); + rbCtx.addPartitionColsToBatch(vrb, partitionValues); } isFirst = false; } @@ -332,59 +311,47 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } final boolean isVectorized = HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED); - if (isAcidScan) { - value.selectedInUse = true; + vrb.selectedInUse = true; if (isVectorized) { - final VectorizedRowBatch acidVrb = new VectorizedRowBatch(cvb.cols.length); - acidVrb.cols = cvb.cols; - acidVrb.size = cvb.size; - final VectorizedOrcAcidRowBatchReader acidReader = - new VectorizedOrcAcidRowBatchReader((OrcSplit)split, jobConf, Reporter.NULL, - new RecordReader() { - @Override - public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { - return true; - } - - @Override - public NullWritable createKey() { - return NullWritable.get(); - } - - @Override - public VectorizedRowBatch createValue() { - return acidVrb; - } - - @Override - public long getPos() throws IOException { - return 0; - } - - @Override - public void close() throws IOException { - } - - @Override - public float getProgress() throws IOException { - return 0; - } - }, rbCtx); - acidReader.next(NullWritable.get(), value); + // TODO: relying everywhere on the magical constants and columns being together means ACID + // columns are going to be super hard to change in a backward compat manner. I can + // foresee someone cursing while refactoring all the magic for prefix schema changes. + // Exclude the row column. + int acidColCount = OrcInputFormat.getRootColumn(false) - 1; + VectorizedRowBatch inputVrb = new VectorizedRowBatch( + acidColCount + 1 + vrb.getDataColumnCount() ); + // By assumption, ACID columns are currently always in the beginning of the arrays. + System.arraycopy(cvb.cols, 0, inputVrb.cols, 0, acidColCount); + for (int ixInReadSet = acidColCount; ixInReadSet < cvb.cols.length; ++ixInReadSet) { + int ixInVrb = includes.getPhysicalColumnIds().get(ixInReadSet); + // TODO: should we create the batch from vrbctx, and reuse the vectors, like below? Future work. + inputVrb.cols[ixInVrb] = cvb.cols[ixInReadSet]; + } + inputVrb.size = cvb.size; + // TODO: reuse between calls + @SuppressWarnings("resource") + VectorizedOrcAcidRowBatchReader acidReader = new VectorizedOrcAcidRowBatchReader( + (OrcSplit)split, jobConf, Reporter.NULL, new AcidWrapper(inputVrb), rbCtx, true); + acidReader.next(NullWritable.get(), vrb); + } else { + // TODO: WTF? The old code seems to just drop the ball here. + throw new AssertionError("Unsupported mode"); } } else { - if (columnCount != cvb.cols.length) { - throw new RuntimeException("Unexpected number of columns, VRB has " + columnCount - + " included, but the reader returned " + cvb.cols.length); + if (includes.getPhysicalColumnIds().size() != cvb.cols.length) { + throw new RuntimeException("Unexpected number of columns, VRB has " + + includes.getPhysicalColumnIds().size() + " included, but the reader returned " + + cvb.cols.length); } - // VRB was created from VrbCtx, so we already have pre-allocated column vectors - for (int i = 0; i < cvb.cols.length; ++i) { - // Return old CVs (if any) to caller. We assume these things all have the same schema. - cvb.swapColumnVector(i, value.cols, columnIds.get(i)); + // VRB was created from VrbCtx, so we already have pre-allocated column vectors. + // Return old CVs (if any) to caller. We assume these things all have the same schema. + for (int ixInReadSet = 0; ixInReadSet < cvb.cols.length; ++ixInReadSet) { + int ixInVrb = includes.getPhysicalColumnIds().get(ixInReadSet); + cvb.swapColumnVector(ixInReadSet, vrb.cols, ixInVrb); } - value.selectedInUse = false; - value.size = cvb.size; + vrb.selectedInUse = false; + vrb.size = cvb.size; } if (wasFirst) { @@ -397,6 +364,44 @@ public VectorizedRowBatchCtx getVectorizedRowBatchCtx() { return rbCtx; } + private static final class AcidWrapper + implements RecordReader { + private final VectorizedRowBatch acidVrb; + + private AcidWrapper(VectorizedRowBatch acidVrb) { + this.acidVrb = acidVrb; + } + + @Override + public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { + return true; + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + return acidVrb; + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + } + + @Override + public float getProgress() throws IOException { + return 0; + } + } + private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(final Thread t, final Throwable e) { @@ -528,4 +533,99 @@ public float getProgress() throws IOException { // TODO: plumb progress info thru the reader if we can get metadata from loader first. return 0.0f; } -} + + + /** This class encapsulates include-related logic for LLAP readers. It is not actually specific + * to LLAP IO but in LLAP IO in particular, I want to encapsulate all this mess for now until + * we have smth better like Schema Evolution v2. This can also hypothetically encapsulate + * field pruning inside structs and stuff like that. */ + private static class IncludesImpl implements SchemaEvolutionFactory, Includes { + private List readerLogicalColumnIds; + private List filePhysicalColumnIds; + private Integer acidStructColumnId = null; + + // For current schema evolution. + private TypeDescription readerSchema; + private JobConf jobConf; + + public IncludesImpl(List tableIncludedCols, boolean isAcidScan, + VectorizedRowBatchCtx rbCtx, TypeDescription readerSchema, JobConf jobConf) { + // Note: columnIds below makes additional changes for ACID. Don't use this var directly. + this.readerSchema = readerSchema; + this.jobConf = jobConf; + if (tableIncludedCols == null) { + // Assume including everything means the VRB will have everything. + // TODO: this is rather brittle, esp. in view of schema evolution (in abstract, not as + // currently implemented in Hive). The compile should supply the columns it expects + // to see, which is not "all, of any schema". Is VRB row CVs the right mechanism + // for that? Who knows. Perhaps resolve in schema evolution v2. + tableIncludedCols = new ArrayList<>(rbCtx.getRowColumnTypeInfos().length); + for (int i = 0; i < rbCtx.getRowColumnTypeInfos().length; ++i) { + tableIncludedCols.add(i); + } + } + LOG.debug("Logical table includes: {}", tableIncludedCols); + this.readerLogicalColumnIds = tableIncludedCols; + // Note: schema evolution currently does not support column index changes. + // So, the indices should line up... to be fixed in SE v2? + List filePhysicalColumnIds = readerLogicalColumnIds; + if (isAcidScan) { + int rootCol = OrcInputFormat.getRootColumn(false); + filePhysicalColumnIds = new ArrayList(filePhysicalColumnIds.size() + rootCol); + this.acidStructColumnId = rootCol - 1; // OrcRecordUpdater.ROW. This is somewhat fragile... + // Note: this guarantees that physical column IDs are in order. + for (int i = 0; i < rootCol; ++i) { + // We don't want to include the root struct in ACID case; it would cause the whole + // struct to get read without projection. + if (acidStructColumnId == i) continue; + filePhysicalColumnIds.add(i); + } + for (int tableColumnId : readerLogicalColumnIds) { + filePhysicalColumnIds.add(rootCol + tableColumnId); + } + } + + this.filePhysicalColumnIds = filePhysicalColumnIds; + } + + @Override + public String toString() { + return "logical columns " + readerLogicalColumnIds + + ", physical columns " + filePhysicalColumnIds; + } + + @Override + public SchemaEvolution createSchemaEvolution(TypeDescription fileSchema) { + if (readerSchema == null) { + readerSchema = fileSchema; + } + // TODO: will this work correctly with ACID? + boolean[] readerIncludes = OrcInputFormat.genIncludedColumns( + readerSchema, readerLogicalColumnIds); + Reader.Options options = new Reader.Options(jobConf).include(readerIncludes); + return new SchemaEvolution(fileSchema, readerSchema, options); + } + + @Override + public boolean[] generateFileIncludes(TypeDescription fileSchema) { + return OrcInputFormat.genIncludedColumns( + fileSchema, filePhysicalColumnIds, acidStructColumnId); + } + + @Override + public List getPhysicalColumnIds() { + return filePhysicalColumnIds; + } + + @Override + public List getReaderLogicalColumnIds() { + return readerLogicalColumnIds; + } + + @Override + public TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema) { + return OrcInputFormat.genIncludedTypes( + fileSchema, filePhysicalColumnIds, acidStructColumnId); + } + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java index 2a2be56cab..a830c07c9e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java @@ -34,14 +34,25 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.SchemaEvolution; /** * Entry point used by LlapInputFormat to create read pipeline to get data. */ public interface ColumnVectorProducer { + public interface SchemaEvolutionFactory { + SchemaEvolution createSchemaEvolution(TypeDescription fileSchema); + } + + public interface Includes { + boolean[] generateFileIncludes(TypeDescription fileSchema); + List getPhysicalColumnIds(); + List getReaderLogicalColumnIds(); + TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema); + } + ReadPipeline createReadPipeline(Consumer consumer, FileSplit split, - List columnIds, SearchArgument sarg, String[] columnNames, - QueryFragmentCounters counters, TypeDescription readerSchema, - InputFormat sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, - JobConf job, Map parts) throws IOException; + Includes includes, SearchArgument sarg, QueryFragmentCounters counters, + SchemaEvolutionFactory sef, InputFormat sourceInputFormat, Deserializer sourceSerDe, + Reporter reporter, JobConf job, Map parts) throws IOException; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java index d66e2f2540..7af1b050ce 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes; import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader; import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata; @@ -81,13 +82,12 @@ public GenericColumnVectorProducer(SerDeLowLevelCacheImpl serdeCache, @Override public ReadPipeline createReadPipeline(Consumer consumer, FileSplit split, - List columnIds, SearchArgument sarg, String[] columnNames, - QueryFragmentCounters counters, TypeDescription schema, InputFormat sourceInputFormat, - Deserializer sourceSerDe, Reporter reporter, JobConf job, Map parts) - throws IOException { + Includes includes, SearchArgument sarg, QueryFragmentCounters counters, + SchemaEvolutionFactory sef, InputFormat sourceInputFormat, Deserializer sourceSerDe, + Reporter reporter, JobConf job, Map parts) throws IOException { cacheMetrics.incrCacheReadRequests(); OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer( - consumer, columnIds.size(), false, counters, ioMetrics); + consumer, includes, false, counters, ioMetrics); SerDeFileMetadata fm; try { fm = new SerDeFileMetadata(sourceSerDe); @@ -97,13 +97,10 @@ public ReadPipeline createReadPipeline(Consumer consumer, Fil edc.setFileMetadata(fm); // Note that we pass job config to the record reader, but use global config for LLAP IO. // TODO: add tracing to serde reader - SerDeEncodedDataReader reader = new SerDeEncodedDataReader(cache, - bufferManager, conf, split, columnIds, edc, job, reporter, sourceInputFormat, + SerDeEncodedDataReader reader = new SerDeEncodedDataReader(cache, bufferManager, conf, + split, includes.getPhysicalColumnIds(), edc, job, reporter, sourceInputFormat, sourceSerDe, counters, fm.getSchema(), parts); edc.init(reader, reader, new IoTrace(0, false)); - if (LlapIoImpl.LOG.isDebugEnabled()) { - LlapIoImpl.LOG.debug("Ignoring schema: " + schema); - } return edc; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 3a7b192a1c..2a0c5ca92f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -85,16 +85,15 @@ public Configuration getConf() { @Override public ReadPipeline createReadPipeline( - Consumer consumer, FileSplit split, List columnIds, - SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters, - TypeDescription readerSchema, InputFormat unused0, Deserializer unused1, - Reporter reporter, JobConf job, Map unused2) throws IOException { + Consumer consumer, FileSplit split, Includes includes, + SearchArgument sarg, QueryFragmentCounters counters, SchemaEvolutionFactory sef, + InputFormat unused0, Deserializer unused1, Reporter reporter, JobConf job, + Map unused2) throws IOException { cacheMetrics.incrCacheReadRequests(); - OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(), - _skipCorrupt, counters, ioMetrics); - OrcEncodedDataReader reader = new OrcEncodedDataReader( - lowLevelCache, bufferManager, metadataCache, conf, job, split, columnIds, sarg, - columnNames, edc, counters, readerSchema, tracePool); + OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer( + consumer, includes, _skipCorrupt, counters, ioMetrics); + OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, + metadataCache, conf, job, split, includes, sarg, edc, counters, sef, tracePool); edc.init(reader, reader, reader.getTrace()); return edc; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 36810d9480..9e8ae103ee 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes; import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; @@ -62,21 +63,22 @@ public class OrcEncodedDataConsumer extends EncodedDataConsumer { private TreeReaderFactory.TreeReader[] columnReaders; - private int[] columnMapping; // Mapping from columnReaders (by index) to columns in file schema. private int previousStripeIndex = -1; private ConsumerFileMetadata fileMetadata; // We assume one request is only for one file. private CompressionCodec codec; private List stripes; private final boolean skipCorrupt; // TODO: get rid of this private final QueryFragmentCounters counters; - private boolean[] includedColumns; private SchemaEvolution evolution; private IoTrace trace; + private final Includes includes; + private TypeDescription[] batchSchemas; public OrcEncodedDataConsumer( - Consumer consumer, int colCount, boolean skipCorrupt, + Consumer consumer, Includes includes, boolean skipCorrupt, QueryFragmentCounters counters, LlapDaemonIOMetrics ioMetrics) { - super(consumer, colCount, ioMetrics); + super(consumer, includes.getPhysicalColumnIds().size(), ioMetrics); + this.includes = includes; // TODO: get rid of this this.skipCorrupt = skipCorrupt; this.counters = counters; @@ -120,20 +122,10 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, } int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1); int batchSize = VectorizedRowBatch.DEFAULT_SIZE; - TypeDescription schema = fileMetadata.getSchema(); + TypeDescription fileSchema = fileMetadata.getSchema(); if (columnReaders == null || !sameStripe) { - int[] columnMapping = new int[schema.getChildren().size()]; - TreeReaderFactory.Context context = - new TreeReaderFactory.ReaderContext() - .setSchemaEvolution(evolution) - .writerTimeZone(stripeMetadata.getWriterTimezone()) - .skipCorrupt(skipCorrupt); - StructTreeReader treeReader = EncodedTreeReaderFactory.createRootTreeReader( - schema, stripeMetadata.getEncodings(), batch, codec, context, columnMapping); - this.columnReaders = treeReader.getChildReaders(); - this.columnMapping = Arrays.copyOf(columnMapping, columnReaders.length); - positionInStreams(columnReaders, batch.getBatchKey(), stripeMetadata); + createColumnReaders(batch, stripeMetadata, fileSchema); } else { repositionInStreams(this.columnReaders, batch, sameStripe, stripeMetadata); } @@ -154,8 +146,7 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, if (cvb.cols[idx] == null) { // Orc store rows inside a root struct (hive writes it this way). // When we populate column vectors we skip over the root struct. - cvb.cols[idx] = createColumn(schema.getChildren().get(columnMapping[idx]), - VectorizedRowBatch.DEFAULT_SIZE); + cvb.cols[idx] = createColumn(batchSchemas[idx], VectorizedRowBatch.DEFAULT_SIZE); } trace.logTreeReaderNextVector(idx); @@ -214,6 +205,25 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, } } + private void createColumnReaders(OrcEncodedColumnBatch batch, + ConsumerStripeMetadata stripeMetadata, TypeDescription fileSchema) throws IOException { + TreeReaderFactory.Context context = new TreeReaderFactory.ReaderContext() + .setSchemaEvolution(evolution).skipCorrupt(skipCorrupt) + .writerTimeZone(stripeMetadata.getWriterTimezone()); + this.batchSchemas = includes.getBatchReaderTypes(fileSchema); + StructTreeReader treeReader = EncodedTreeReaderFactory.createRootTreeReader( + batchSchemas, stripeMetadata.getEncodings(), batch, codec, context); + this.columnReaders = treeReader.getChildReaders(); + + if (LlapIoImpl.LOG.isDebugEnabled()) { + for (int i = 0; i < columnReaders.length; ++i) { + LlapIoImpl.LOG.debug("Created a reader at " + i + ": " + columnReaders[i] + + " from schema " + batchSchemas[i]); + } + } + positionInStreams(columnReaders, batch.getBatchKey(), stripeMetadata); + } + private ColumnVector createColumn(TypeDescription type, int batchSize) { switch (type.getCategory()) { case BOOLEAN: @@ -344,15 +354,6 @@ private long getRowCount(OrcProto.RowIndexEntry rowIndexEntry) { return rowIndexEntry.getStatistics().getNumberOfValues(); } - @Override - public boolean[] getIncludedColumns() { - return includedColumns; - } - - public void setIncludedColumns(final boolean[] includedColumns) { - this.includedColumns = includedColumns; - } - public void setSchemaEvolution(SchemaEvolution evolution) { this.evolution = evolution; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java index 06708d34a7..05b142987e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java @@ -27,5 +27,4 @@ public interface ReadPipeline extends ConsumerFeedback { public Callable getReadCallable(); SchemaEvolution getSchemaEvolution(); - boolean[] getIncludedColumns(); } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index a6d2a0497c..0270e54f18 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -67,6 +67,8 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory; import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; @@ -79,6 +81,7 @@ import org.apache.orc.DataReader; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.orc.OrcConf; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcSplit; @@ -154,9 +157,7 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private final BufferUsageManager bufferManager; private final Configuration daemonConf, jobConf; private final FileSplit split; - private List includedColumnIds; private final SearchArgument sarg; - private final String[] columnNames; private final OrcEncodedDataConsumer consumer; private final QueryFragmentCounters counters; private final UserGroupInformation ugi; @@ -184,27 +185,21 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { @SuppressWarnings("unused") private volatile boolean isPaused = false; - boolean[] readerIncludes = null, sargColumns = null, fileIncludes = null; + boolean[] sargColumns = null, fileIncludes = null; private final IoTrace trace; private Pool tracePool; public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager, MetadataCache metadataCache, Configuration daemonConf, Configuration jobConf, - FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, - OrcEncodedDataConsumer consumer, QueryFragmentCounters counters, - TypeDescription readerSchema, Pool tracePool) + FileSplit split, Includes includes, SearchArgument sarg, OrcEncodedDataConsumer consumer, + QueryFragmentCounters counters, SchemaEvolutionFactory sef, Pool tracePool) throws IOException { this.lowLevelCache = lowLevelCache; this.metadataCache = metadataCache; this.bufferManager = bufferManager; this.daemonConf = daemonConf; this.split = split; - this.includedColumnIds = columnIds; - if (this.includedColumnIds != null) { - Collections.sort(this.includedColumnIds); - } this.sarg = sarg; - this.columnNames = columnNames; this.consumer = consumer; this.counters = counters; this.trace = tracePool.take(); @@ -216,8 +211,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff } this.useCodecPool = HiveConf.getBoolVar(daemonConf, ConfVars.HIVE_ORC_CODEC_POOL); - // moved this part of code from performDataRead as LlapInputFormat need to know the file schema - // to decide if schema evolution is supported or not. + // LlapInputFormat needs to know the file schema to decide if schema evolution is supported. orcReader = null; // 1. Get file metadata from cache, or create the reader and read it. // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that @@ -227,15 +221,12 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID)); fileMetadata = getFileFooterFromCacheOrDisk(); final TypeDescription fileSchema = fileMetadata.getSchema(); - if (readerSchema == null) { - readerSchema = fileMetadata.getSchema(); - } - readerIncludes = OrcInputFormat.genIncludedColumns(readerSchema, includedColumnIds); - if (AcidUtils.isFullAcidScan(jobConf)) { - fileIncludes = OrcInputFormat.shiftReaderIncludedForAcid(readerIncludes); - } else { - fileIncludes = OrcInputFormat.genIncludedColumns(fileSchema, includedColumnIds); + + fileIncludes = includes.generateFileIncludes(fileSchema); + if (LOG.isDebugEnabled()) { + LOG.debug("From {}, the file includes are {}", includes, DebugUtils.toString(fileIncludes)); } + // Do not allow users to override zero-copy setting. The rest can be taken from user config. boolean useZeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(daemonConf); if (useZeroCopy != OrcConf.USE_ZEROCOPY.getBoolean(jobConf)) { @@ -243,10 +234,9 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff jobConf.setBoolean(OrcConf.USE_ZEROCOPY.getAttribute(), useZeroCopy); } this.jobConf = jobConf; - Reader.Options options = new Reader.Options(jobConf).include(readerIncludes); - evolution = new SchemaEvolution(fileMetadata.getSchema(), readerSchema, options); + // TODO: setFileMetadata could just create schema. Called in two places; clean up later. + this.evolution = sef.createSchemaEvolution(fileMetadata.getSchema()); consumer.setFileMetadata(fileMetadata); - consumer.setIncludedColumns(readerIncludes); consumer.setSchemaEvolution(evolution); } @@ -290,9 +280,6 @@ protected Void performDataRead() throws IOException, InterruptedException { + (fileKey == null ? "" : " (" + fileKey + ")")); try { validateFileMetadata(); - if (includedColumnIds == null) { - includedColumnIds = getAllColumnIds(fileMetadata); - } // 2. Determine which stripes to read based on the split. determineStripesToRead(); @@ -316,12 +303,14 @@ protected Void performDataRead() throws IOException, InterruptedException { try { if (sarg != null && stride != 0) { // TODO: move this to a common method + // Note: this gets IDs by name, so we assume indices don't need to be adjusted for ACID. int[] filterColumns = RecordReaderImpl.mapSargColumnsToOrcInternalColIdx( sarg.getLeaves(), evolution); // included will not be null, row options will fill the array with trues if null sargColumns = new boolean[evolution.getFileSchema().getMaximumId() + 1]; for (int i : filterColumns) { // filter columns may have -1 as index which could be partition column in SARG. + // TODO: should this then be >=? if (i > 0) { sargColumns[i] = true; } @@ -332,7 +321,7 @@ protected Void performDataRead() throws IOException, InterruptedException { } // Now, apply SARG if any; w/o sarg, this will just initialize stripeRgs. - boolean hasData = determineRgsToRead(fileIncludes, stride, stripeMetadatas); + boolean hasData = determineRgsToRead(stride, stripeMetadatas); if (!hasData) { consumer.setDone(); recordReaderTime(startTime); @@ -530,19 +519,6 @@ private static Object determineFileId(FileSystem fs, FileSplit split, } /** - * Puts all column indexes from metadata to make a column list to read all column. - */ - private static List getAllColumnIds(OrcFileMetadata metadata) { - int rootColumn = OrcInputFormat.getRootColumn(true); - List types = metadata.getTypes().get(rootColumn).getSubtypesList(); - List columnIds = new ArrayList(types.size()); - for (int i = 0; i < types.size(); ++i) { - columnIds.add(i); - } - return columnIds; - } - - /** * Closes the stripe readers (on error). */ private void cleanupReaders() { @@ -815,7 +791,7 @@ public void returnData(OrcEncodedColumnBatch ecb) { * Determines which RGs need to be read, after stripes have been determined. * SARG is applied, and readState is populated for each stripe accordingly. */ - private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride, + private boolean determineRgsToRead(int rowIndexStride, ArrayList metadata) throws IOException { RecordReaderImpl.SargApplier sargApp = null; if (sarg != null && rowIndexStride != 0) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 166abf7c70..0cb1828868 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -948,7 +948,8 @@ private boolean processOneSlice(CacheWriter.CacheStripeData diskData, boolean[] OrcEncodedColumnBatch ecb = ECB_POOL.take(); ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length); - for (int colIx = 0; colIx < writerIncludes.length; ++colIx) { + // Skip the 0th column that is the root structure. + for (int colIx = 1; colIx < writerIncludes.length; ++colIx) { if (!writerIncludes[colIx]) continue; ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS); if (!hasAllData && splitIncludes[colIx]) { @@ -1035,18 +1036,15 @@ private boolean processOneSlice(Vectors diskData, boolean[] splitIncludes, int s ecb.init(fileKey, metadata.getStripeIx(), OrcEncodedColumnBatch.ALL_RGS, writerIncludes.length); int vectorsIx = 0; for (int colIx = 0; colIx < writerIncludes.length; ++colIx) { + // Skip the 0-th column, since it won't have a vector after reading the text source. + if (colIx == 0) continue; if (!writerIncludes[colIx]) continue; if (splitIncludes[colIx]) { - // Skip the 0-th column, since it won't have a vector after reading the text source. - if (colIx != 0 ) { - List vectors = diskData.getVectors(vectorsIx++); - if (LlapIoImpl.LOG.isTraceEnabled()) { - LlapIoImpl.LOG.trace("Processing vectors for column " + colIx + ": " + vectors); - } - ecb.initColumnWithVectors(colIx, vectors); - } else { - ecb.initColumn(0, OrcEncodedColumnBatch.MAX_DATA_STREAMS); + List vectors = diskData.getVectors(vectorsIx++); + if (LlapIoImpl.LOG.isTraceEnabled()) { + LlapIoImpl.LOG.trace("Processing vectors for column " + colIx + ": " + vectors); } + ecb.initColumnWithVectors(colIx, vectors); } else { ecb.initColumn(colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS); processColumnCacheData(cacheBuffers, ecb, colIx); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java index 681d9cad7b..f4293080f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java @@ -65,6 +65,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; /** @@ -112,7 +113,6 @@ private void initEntry(int logicalColumnIndex, int projectionColumnNum, TypeInfo */ public void init(StructObjectInspector structObjectInspector, List projectedColumns) throws HiveException { - List fields = structObjectInspector.getAllStructFieldRefs(); final int count = fields.size(); allocateArrays(count); @@ -125,7 +125,6 @@ public void init(StructObjectInspector structObjectInspector, List proj ObjectInspector fieldInspector = field.getFieldObjectInspector(); TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldInspector.getTypeName()); - initEntry(i, projectionColumnNum, typeInfo); } } @@ -148,7 +147,8 @@ public void init(TypeInfo[] typeInfos, int[] projectedColumns) * Initialize using data type names. * No projection -- the column range 0 .. types.size()-1 */ - public void init(List typeNames) throws HiveException { + @VisibleForTesting + void init(List typeNames) throws HiveException { final int count = typeNames.size(); allocateArrays(count); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index 59b3ae9698..22d2f343b9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index e9564852bd..cf0d0134db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.io.orc; import org.apache.hadoop.hive.ql.plan.DynamicValue.NoDynamicValuesException; - import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -30,6 +29,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -342,10 +342,15 @@ public static boolean isOriginal(Footer footer) { return false; } - + public static boolean[] genIncludedColumns(TypeDescription readerSchema, List included) { + return genIncludedColumns(readerSchema, included, null); + } + public static boolean[] genIncludedColumns(TypeDescription readerSchema, + List included, + Integer recursiveStruct) { boolean[] result = new boolean[readerSchema.getMaximumId() + 1]; if (included == null) { Arrays.fill(result, true); @@ -355,15 +360,53 @@ public static boolean isOriginal(Footer footer) { List children = readerSchema.getChildren(); for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) { if (included.contains(columnNumber)) { - TypeDescription child = children.get(columnNumber); - for(int col = child.getId(); col <= child.getMaximumId(); ++col) { - result[col] = true; + addColumnToIncludes(children.get(columnNumber), result); + } else if (recursiveStruct != null && recursiveStruct == columnNumber) { + // This assumes all struct cols immediately follow struct + List nestedChildren = children.get(columnNumber).getChildren(); + for (int columnNumberDelta = 0; columnNumberDelta < nestedChildren.size(); ++columnNumberDelta) { + int columnNumberNested = columnNumber + 1 + columnNumberDelta; + if (included.contains(columnNumberNested)) { + addColumnToIncludes(nestedChildren.get(columnNumberDelta), result); + } + } + } + } + + return result; + } + + // Mostly dup of genIncludedColumns + public static TypeDescription[] genIncludedTypes(TypeDescription fileSchema, + List included, Integer recursiveStruct) { + TypeDescription[] result = new TypeDescription[included.size()]; + List children = fileSchema.getChildren(); + for (int columnNumber = 0; columnNumber < children.size(); ++columnNumber) { + int indexInBatchCols = included.indexOf(columnNumber); + if (indexInBatchCols >= 0) { + result[indexInBatchCols] = children.get(columnNumber); + } else if (recursiveStruct != null && recursiveStruct == columnNumber) { + // This assumes all struct cols immediately follow struct + List nestedChildren = children.get(columnNumber).getChildren(); + for (int columnNumberDelta = 0; columnNumberDelta < nestedChildren.size(); ++columnNumberDelta) { + int columnNumberNested = columnNumber + 1 + columnNumberDelta; + int nestedIxInBatchCols = included.indexOf(columnNumberNested); + if (nestedIxInBatchCols >= 0) { + result[nestedIxInBatchCols] = nestedChildren.get(columnNumberDelta); + } } } } return result; } + + private static void addColumnToIncludes(TypeDescription child, boolean[] result) { + for(int col = child.getId(); col <= child.getMaximumId(); ++col) { + result[col] = true; + } + } + /** * Reverses genIncludedColumns; produces the table columns indexes from ORC included columns. * @param readerSchema The ORC reader schema for the table. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index e296351225..77736eea75 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -72,6 +72,7 @@ protected float progress = 0.0f; protected Object[] partitionValues; private boolean addPartitionCols = true; + private final boolean isFlatPayload; private final ValidWriteIdList validWriteIdList; private final DeleteEventRegistry deleteEventRegistry; /** @@ -104,7 +105,8 @@ @VisibleForTesting VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Reporter reporter, VectorizedRowBatchCtx rbCtx) throws IOException { - this(conf, inputSplit, reporter, rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx); + this(conf, inputSplit, reporter, + rbCtx == null ? Utilities.getVectorizedRowBatchCtx(conf) : rbCtx, false); final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, (OrcSplit) inputSplit); // Careful with the range here now, we do not want to read the whole base file like deltas. @@ -143,20 +145,22 @@ public float getProgress() throws IOException { }; this.vectorizedRowBatchBase = ((RecordReaderImpl) innerReader).createRowBatch(); } + /** * LLAP IO c'tor */ public VectorizedOrcAcidRowBatchReader(OrcSplit inputSplit, JobConf conf, Reporter reporter, org.apache.hadoop.mapred.RecordReader baseReader, - VectorizedRowBatchCtx rbCtx) throws IOException { - this(conf, inputSplit, reporter, rbCtx); + VectorizedRowBatchCtx rbCtx, boolean isFlatPayload) throws IOException { + this(conf, inputSplit, reporter, rbCtx, isFlatPayload); this.baseReader = baseReader; this.innerReader = null; this.vectorizedRowBatchBase = baseReader.createValue(); } private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporter reporter, - VectorizedRowBatchCtx rowBatchCtx) throws IOException { + VectorizedRowBatchCtx rowBatchCtx, boolean isFlatPayload) throws IOException { + this.isFlatPayload = isFlatPayload; this.rbCtx = rowBatchCtx; final boolean isAcidRead = AcidUtils.isFullAcidScan(conf); final AcidUtils.AcidOperationalProperties acidOperationalProperties @@ -206,13 +210,13 @@ private VectorizedOrcAcidRowBatchReader(JobConf conf, OrcSplit orcSplit, Reporte } this.deleteEventRegistry = der; isOriginal = orcSplit.isOriginal(); - if(isOriginal) { + if (isOriginal) { recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, new LongColumnVector(), new LongColumnVector(), new LongColumnVector()); - } - else { - //will swap in the Vectors from underlying row batch - recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null); + } else { + // Will swap in the Vectors from underlying row batch. + recordIdColumnVector = new StructColumnVector( + VectorizedRowBatch.DEFAULT_SIZE, null, null, null); } rowIdProjected = areRowIdsProjected(rbCtx); rootPath = orcSplit.getRootDir(); @@ -412,73 +416,10 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti selectedBitSet.set(0, vectorizedRowBatchBase.size, true); } ColumnVector[] innerRecordIdColumnVector = vectorizedRowBatchBase.cols; - if(isOriginal) { - /* - * If there are deletes and reading original file, we must produce synthetic ROW_IDs in order - * to see if any deletes apply - */ - boolean needSyntheticRowId = - needSyntheticRowIds(true, !deleteEventRegistry.isEmpty(), rowIdProjected); - if(needSyntheticRowId) { - assert syntheticProps != null && syntheticProps.rowIdOffset >= 0 : "" + syntheticProps; - assert syntheticProps != null && syntheticProps.bucketProperty >= 0 : "" + syntheticProps; - if(innerReader == null) { - throw new IllegalStateException(getClass().getName() + " requires " + - org.apache.orc.RecordReader.class + - " to handle original files that require ROW__IDs: " + rootPath); - } - /** - * {@link RecordIdentifier#getWriteId()} - */ - recordIdColumnVector.fields[0].noNulls = true; - recordIdColumnVector.fields[0].isRepeating = true; - ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = syntheticProps.syntheticTxnId; - /** - * This is {@link RecordIdentifier#getBucketProperty()} - * Also see {@link BucketCodec} - */ - recordIdColumnVector.fields[1].noNulls = true; - recordIdColumnVector.fields[1].isRepeating = true; - ((LongColumnVector)recordIdColumnVector.fields[1]).vector[0] = syntheticProps.bucketProperty; - /** - * {@link RecordIdentifier#getRowId()} - */ - recordIdColumnVector.fields[2].noNulls = true; - recordIdColumnVector.fields[2].isRepeating = false; - long[] rowIdVector = ((LongColumnVector)recordIdColumnVector.fields[2]).vector; - for(int i = 0; i < vectorizedRowBatchBase.size; i++) { - //baseReader.getRowNumber() seems to point at the start of the batch todo: validate - rowIdVector[i] = syntheticProps.rowIdOffset + innerReader.getRowNumber() + i; - } - //Now populate a structure to use to apply delete events - innerRecordIdColumnVector = new ColumnVector[OrcRecordUpdater.FIELDS]; - innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_WRITEID] = recordIdColumnVector.fields[0]; - innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1]; - innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2]; - //these are insert events so (original txn == current) txn for all rows - innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_WRITEID] = recordIdColumnVector.fields[0]; - } - if(syntheticProps.syntheticTxnId > 0) { - //"originals" (written before table was converted to acid) is considered written by - // txnid:0 which is always committed so there is no need to check wrt invalid transactions - //But originals written by Load Data for example can be in base_x or delta_x_x so we must - //check if 'x' is committed or not evn if ROW_ID is not needed in the Operator pipeline. - if (needSyntheticRowId) { - findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector, - vectorizedRowBatchBase.size, selectedBitSet); - } else { - /*since ROW_IDs are not needed we didn't create the ColumnVectors to hold them but we - * still have to check if the data being read is committed as far as current - * reader (transactions) is concerned. Since here we are reading 'original' schema file, - * all rows in it have been created by the same txn, namely 'syntheticProps.syntheticTxnId' - */ - if (!validWriteIdList.isWriteIdValid(syntheticProps.syntheticTxnId)) { - selectedBitSet.clear(0, vectorizedRowBatchBase.size); - } - } - } - } - else { + if (isOriginal) { + // Handle synthetic row IDs for the original files. + innerRecordIdColumnVector = handleOriginalFile(selectedBitSet, innerRecordIdColumnVector); + } else { // Case 1- find rows which belong to transactions that are not valid. findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); } @@ -505,29 +446,103 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } } - if(isOriginal) { - /*Just copy the payload. {@link recordIdColumnVector} has already been populated*/ - System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, - value.getDataColumnCount()); - } - else { - // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch. - StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW]; - // Transfer columnVector objects from base batch to outgoing batch. - System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); - if(rowIdProjected) { + if (isOriginal) { + /* Just copy the payload. {@link recordIdColumnVector} has already been populated */ + System.arraycopy(vectorizedRowBatchBase.cols, 0, value.cols, 0, value.getDataColumnCount()); + } else { + int payloadCol = OrcRecordUpdater.ROW; + if (isFlatPayload) { + // Ignore the struct column and just copy all the following data columns. + System.arraycopy(vectorizedRowBatchBase.cols, payloadCol + 1, value.cols, 0, + vectorizedRowBatchBase.cols.length - payloadCol - 1); + } else { + StructColumnVector payloadStruct = + (StructColumnVector) vectorizedRowBatchBase.cols[payloadCol]; + // Transfer columnVector objects from base batch to outgoing batch. + System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); + } + if (rowIdProjected) { recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_WRITEID]; recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET]; recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID]; } } - if(rowIdProjected) { + if (rowIdProjected) { rbCtx.setRecordIdColumnVector(recordIdColumnVector); } progress = baseReader.getProgress(); return true; } + private ColumnVector[] handleOriginalFile( + BitSet selectedBitSet, ColumnVector[] innerRecordIdColumnVector) throws IOException { + /* + * If there are deletes and reading original file, we must produce synthetic ROW_IDs in order + * to see if any deletes apply + */ + boolean needSyntheticRowId = + needSyntheticRowIds(true, !deleteEventRegistry.isEmpty(), rowIdProjected); + if(needSyntheticRowId) { + assert syntheticProps != null && syntheticProps.rowIdOffset >= 0 : "" + syntheticProps; + assert syntheticProps != null && syntheticProps.bucketProperty >= 0 : "" + syntheticProps; + if(innerReader == null) { + throw new IllegalStateException(getClass().getName() + " requires " + + org.apache.orc.RecordReader.class + + " to handle original files that require ROW__IDs: " + rootPath); + } + /** + * {@link RecordIdentifier#getWriteId()} + */ + recordIdColumnVector.fields[0].noNulls = true; + recordIdColumnVector.fields[0].isRepeating = true; + ((LongColumnVector)recordIdColumnVector.fields[0]).vector[0] = syntheticProps.syntheticTxnId; + /** + * This is {@link RecordIdentifier#getBucketProperty()} + * Also see {@link BucketCodec} + */ + recordIdColumnVector.fields[1].noNulls = true; + recordIdColumnVector.fields[1].isRepeating = true; + ((LongColumnVector)recordIdColumnVector.fields[1]).vector[0] = syntheticProps.bucketProperty; + /** + * {@link RecordIdentifier#getRowId()} + */ + recordIdColumnVector.fields[2].noNulls = true; + recordIdColumnVector.fields[2].isRepeating = false; + long[] rowIdVector = ((LongColumnVector)recordIdColumnVector.fields[2]).vector; + for(int i = 0; i < vectorizedRowBatchBase.size; i++) { + //baseReader.getRowNumber() seems to point at the start of the batch todo: validate + rowIdVector[i] = syntheticProps.rowIdOffset + innerReader.getRowNumber() + i; + } + //Now populate a structure to use to apply delete events + innerRecordIdColumnVector = new ColumnVector[OrcRecordUpdater.FIELDS]; + innerRecordIdColumnVector[OrcRecordUpdater.ORIGINAL_WRITEID] = recordIdColumnVector.fields[0]; + innerRecordIdColumnVector[OrcRecordUpdater.BUCKET] = recordIdColumnVector.fields[1]; + innerRecordIdColumnVector[OrcRecordUpdater.ROW_ID] = recordIdColumnVector.fields[2]; + //these are insert events so (original txn == current) txn for all rows + innerRecordIdColumnVector[OrcRecordUpdater.CURRENT_WRITEID] = recordIdColumnVector.fields[0]; + } + if(syntheticProps.syntheticTxnId > 0) { + //"originals" (written before table was converted to acid) is considered written by + // txnid:0 which is always committed so there is no need to check wrt invalid transactions + //But originals written by Load Data for example can be in base_x or delta_x_x so we must + //check if 'x' is committed or not evn if ROW_ID is not needed in the Operator pipeline. + if (needSyntheticRowId) { + findRecordsWithInvalidTransactionIds(innerRecordIdColumnVector, + vectorizedRowBatchBase.size, selectedBitSet); + } else { + /*since ROW_IDs are not needed we didn't create the ColumnVectors to hold them but we + * still have to check if the data being read is committed as far as current + * reader (transactions) is concerned. Since here we are reading 'original' schema file, + * all rows in it have been created by the same txn, namely 'syntheticProps.syntheticTxnId' + */ + if (!validWriteIdList.isWriteIdValid(syntheticProps.syntheticTxnId)) { + selectedBitSet.clear(0, vectorizedRowBatchBase.size); + } + } + } + return innerRecordIdColumnVector; + } + private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { findRecordsWithInvalidTransactionIds(batch.cols, batch.size, selectedBitSet); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java index c1e55c7fda..f6b949e51b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReader.java @@ -34,14 +34,14 @@ * @param index Externally provided metadata (from metadata reader or external cache). * @param encodings Externally provided metadata (from metadata reader or external cache). * @param streams Externally provided metadata (from metadata reader or external cache). - * @param included The array of booleans indicating whether each column should be read. + * @param physicalFileIncludes The array of booleans indicating whether each column should be read. * @param colRgs Arrays of rgs, per column set to true in included, that are to be read. * null in each respective position means all rgs for this column need to be read. * @param consumer The sink for data that has been read. */ void readEncodedColumns(int stripeIx, StripeInformation stripe, OrcProto.RowIndex[] index, List encodings, - List streams, boolean[] included, boolean[] rgs, + List streams, boolean[] physicalFileIncludes, boolean[] rgs, Consumer consumer) throws IOException; /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 32bdf6e68e..b4ff1e4b19 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -100,6 +100,8 @@ * 6) Given that RG end boundaries in ORC are estimates, we can request data from cache and then * not use it; thus, at the end we go thru all the MBs, and release those not released by (5). */ +// Note: this thing should know nothing about ACID or schema. It reads physical columns by index; +// schema evolution/ACID schema considerations should be on higher level. class EncodedReaderImpl implements EncodedReader { public static final Logger LOG = LoggerFactory.getLogger(EncodedReaderImpl.class); private static Field cleanerField; @@ -281,7 +283,7 @@ public String toString() { @Override public void readEncodedColumns(int stripeIx, StripeInformation stripe, OrcProto.RowIndex[] indexes, List encodings, - List streamList, boolean[] included, boolean[] rgs, + List streamList, boolean[] physicalFileIncludes, boolean[] rgs, Consumer consumer) throws IOException { // Note: for now we don't have to setError here, caller will setError if we throw. // We are also not supposed to call setDone, since we are only part of the operation. @@ -297,11 +299,11 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, // We assume stream list is sorted by column and that non-data // streams do not interleave data streams for the same column. // 1.2. With that in mind, determine disk ranges to read/get from cache (not by stream). - ColumnReadContext[] colCtxs = new ColumnReadContext[included.length]; + ColumnReadContext[] colCtxs = new ColumnReadContext[physicalFileIncludes.length]; int colRgIx = -1; // Don't create context for the 0-s column. - for (int i = 1; i < included.length; ++i) { - if (!included[i]) continue; + for (int i = 1; i < physicalFileIncludes.length; ++i) { + if (!physicalFileIncludes[i]) continue; ColumnEncoding enc = encodings.get(i); colCtxs[i] = new ColumnReadContext(i, enc, indexes[i], ++colRgIx); if (isTracingEnabled) { @@ -315,10 +317,10 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, long length = stream.getLength(); int colIx = stream.getColumn(); OrcProto.Stream.Kind streamKind = stream.getKind(); - if (!included[colIx] || StreamName.getArea(streamKind) != StreamName.Area.DATA) { + if (!physicalFileIncludes[colIx] || StreamName.getArea(streamKind) != StreamName.Area.DATA) { // We have a stream for included column, but in future it might have no data streams. // It's more like "has at least one column included that has an index stream". - hasIndexOnlyCols = hasIndexOnlyCols || included[colIx]; + hasIndexOnlyCols = hasIndexOnlyCols || physicalFileIncludes[colIx]; if (isTracingEnabled) { LOG.trace("Skipping stream for column " + colIx + ": " + streamKind + " at " + offset + ", " + length); @@ -357,7 +359,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, // TODO: there may be a bug here. Could there be partial RG filtering on index-only column? if (hasIndexOnlyCols && (rgs == null)) { OrcEncodedColumnBatch ecb = POOLS.ecbPool.take(); - ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, included.length); + ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, physicalFileIncludes.length); try { consumer.consumeData(ecb); } catch (InterruptedException e) { @@ -399,7 +401,7 @@ public void readEncodedColumns(int stripeIx, StripeInformation stripe, trace.logStartRg(rgIx); boolean hasErrorForEcb = true; try { - ecb.init(fileKey, stripeIx, rgIx, included.length); + ecb.init(fileKey, stripeIx, rgIx, physicalFileIncludes.length); for (int colIx = 0; colIx < colCtxs.length; ++colIx) { ColumnReadContext ctx = colCtxs[colIx]; if (ctx == null) continue; // This column is not included. @@ -1815,21 +1817,21 @@ public String toString() { @Override public void readIndexStreams(OrcIndex index, StripeInformation stripe, - List streams, boolean[] included, boolean[] sargColumns) + List streams, boolean[] physicalFileIncludes, boolean[] sargColumns) throws IOException { long stripeOffset = stripe.getOffset(); - DiskRangeList indexRanges = planIndexReading( - fileSchema, streams, true, included, sargColumns, version, index.getBloomFilterKinds()); + DiskRangeList indexRanges = planIndexReading(fileSchema, streams, true, physicalFileIncludes, + sargColumns, version, index.getBloomFilterKinds()); if (indexRanges == null) { if (LOG.isDebugEnabled()) { LOG.debug("Nothing to read for stripe [" + stripe + "]"); } return; } - ReadContext[] colCtxs = new ReadContext[included.length]; + ReadContext[] colCtxs = new ReadContext[physicalFileIncludes.length]; int colRgIx = -1; - for (int i = 0; i < included.length; ++i) { - if (!included[i] && (sargColumns == null || !sargColumns[i])) continue; + for (int i = 0; i < physicalFileIncludes.length; ++i) { + if (!physicalFileIncludes[i] && (sargColumns == null || !sargColumns[i])) continue; colCtxs[i] = new ReadContext(i, ++colRgIx); if (isTracingEnabled) { LOG.trace("Creating context: " + colCtxs[i].toString()); @@ -1844,7 +1846,7 @@ public void readIndexStreams(OrcIndex index, StripeInformation stripe, // See planIndexReading - only read non-row-index streams if involved in SARGs. if ((StreamName.getArea(streamKind) == StreamName.Area.INDEX) && ((sargColumns != null && sargColumns[colIx]) - || (included[colIx] && streamKind == Kind.ROW_INDEX))) { + || (physicalFileIncludes[colIx] && streamKind == Kind.ROW_INDEX))) { trace.logAddStream(colIx, streamKind, offset, length, -1, true); colCtxs[colIx].addStream(offset, stream, -1); if (isTracingEnabled) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java index 1e7708e9ae..42532f9a0e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java @@ -20,8 +20,10 @@ import org.apache.orc.impl.RunLengthByteReader; import java.io.IOException; +import java.util.Arrays; import java.util.List; +import org.apache.curator.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -2099,35 +2101,27 @@ public static StreamReaderBuilder builder() { } } - public static StructTreeReader createRootTreeReader(TypeDescription schema, + public static StructTreeReader createRootTreeReader(TypeDescription[] batchSchemas, List encodings, OrcEncodedColumnBatch batch, - CompressionCodec codec, TreeReaderFactory.Context context, int[] columnMapping) - throws IOException { - if (schema.getCategory() != Category.STRUCT) { - throw new AssertionError("Schema is not a struct: " + schema); - } - // Some child types may be excluded. Note that this can only happen at root level. - List children = schema.getChildren(); - int childCount = children.size(), includedCount = 0; - for (int childIx = 0; childIx < childCount; ++childIx) { - int batchColIx = children.get(childIx).getId(); + CompressionCodec codec, TreeReaderFactory.Context context) throws IOException { + // Note: we only look at the schema here to deal with complex types. Somebody has set up the + // reader with whatever ideas they had to the schema and we just trust the reader to + // produce the CVBs that was asked for. However, we only need to look at top level columns. + int includedCount = batch.getColumnsWithDataCount(); + if (batchSchemas.length > includedCount) { + throw new AssertionError("For " + Arrays.toString(batchSchemas) + ", only received " + + includedCount + " columns"); + } + TreeReader[] childReaders = new TreeReader[batchSchemas.length]; + for (int i = 0; i < batchSchemas.length; ++i) { + int batchColIx = batchSchemas[i].getId(); if (!batch.hasData(batchColIx) && !batch.hasVectors(batchColIx)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Column at " + childIx + " " + children.get(childIx).getId() - + ":" + children.get(childIx).toString() + " has no data"); - } - continue; + throw new AssertionError("No data for column " + batchColIx + ": " + batchSchemas[i]); } - ++includedCount; - } - TreeReader[] childReaders = new TreeReader[includedCount]; - for (int schemaChildIx = 0, inclChildIx = -1; schemaChildIx < childCount; ++schemaChildIx) { - int batchColIx = children.get(schemaChildIx).getId(); - if (!batch.hasData(batchColIx) && !batch.hasVectors(batchColIx)) continue; - childReaders[++inclChildIx] = createEncodedTreeReader( - schema.getChildren().get(schemaChildIx), encodings, batch, codec, context); - columnMapping[inclChildIx] = schemaChildIx; + childReaders[i] = createEncodedTreeReader(batchSchemas[i], encodings, batch, codec, context); } + + // TODO: do we actually need this reader? the caller just extracts child readers. return StructStreamReader.builder() .setColumnIndex(0) .setCompressionCodec(codec) diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java index 50d10e33cd..025600b701 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java @@ -105,11 +105,10 @@ protected void resetColumnArrays(int columnCount) { if (columnVectors != null && columnCount == columnVectors.length) { Arrays.fill(columnVectors, null); return; - } if (columnVectors != null) { - columnVectors = new List[columnCount]; - } else { - columnVectors = null; } + if (columnVectors != null) { + columnVectors = new List[columnCount]; + } // else just keep it null } public boolean hasVectors(int colIx) { @@ -120,5 +119,14 @@ public boolean hasVectors(int colIx) { if (!hasVectors(colIx)) throw new AssertionError("No data for column " + colIx); return columnVectors[colIx]; } + + public int getColumnsWithDataCount() { + int childCount = hasData.length, result = 0; + for (int childIx = 0; childIx < childCount; ++childIx) { + if (!hasData(childIx) && !hasVectors(childIx)) continue; + ++result; + } + return result; + } } } diff --git ql/src/test/queries/clientpositive/llap_acid2.q ql/src/test/queries/clientpositive/llap_acid2.q new file mode 100644 index 0000000000..76f6203d73 --- /dev/null +++ ql/src/test/queries/clientpositive/llap_acid2.q @@ -0,0 +1,84 @@ +set hive.mapred.mode=nonstrict; +SET hive.vectorized.execution.enabled=true; + +SET hive.llap.io.enabled=false; + +SET hive.exec.orc.default.buffer.size=32768; +SET hive.exec.orc.default.row.index.stride=1000; +SET hive.optimize.index.filter=true; +set hive.fetch.task.conversion=none; + +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +DROP TABLE orc_llap; + +CREATE TABLE orc_llap ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cint0 INT, + cbigint0 BIGINT, + cfloat0 FLOAT, + cdouble0 DOUBLE, + cint1 INT, + cbigint1 BIGINT, + cfloat1 FLOAT, + cdouble1 DOUBLE, + cstring1 string, + cfloat2 float +) stored as orc TBLPROPERTIES ('transactional'='true'); + + +insert into table orc_llap +select cint, cbigint, cfloat, cdouble, + cint as c1, cbigint as c2, cfloat as c3, cdouble as c4, + cint as c8, cbigint as c7, cfloat as c6, cdouble as c5, + cstring1, cfloat as c9 from alltypesorc order by cdouble asc limit 30; + + + + + +CREATE TABLE orc_llap2 ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cint0 INT, + cbigint0 BIGINT, + cfloat0 FLOAT, + cdouble0 DOUBLE, + cint1 INT, + cbigint1 BIGINT, + cfloat1 FLOAT, + cdouble1 DOUBLE, + cstring1 string, + cfloat2 float +) stored as orc TBLPROPERTIES ('transactional'='false'); + +insert into table orc_llap2 +select cint, cbigint, cfloat, cdouble, + cint as c1, cbigint as c2, cfloat as c3, cdouble as c4, + cint as c8, cbigint as c7, cfloat as c6, cdouble as c5, + cstring1, cfloat as c9 from alltypesorc order by cdouble asc limit 30; + +alter table orc_llap2 set TBLPROPERTIES ('transactional'='true'); + +update orc_llap2 set cstring1 = 'testvalue' where cstring1 = 'N016jPED08o'; + + +SET hive.llap.io.enabled=true; + +select cstring1 from orc_llap; +select cfloat2, cint from orc_llap; +select * from orc_llap; + +select cstring1 from orc_llap2; +select cfloat2, cint from orc_llap2; +select * from orc_llap2; + + +DROP TABLE orc_llap; diff --git ql/src/test/results/clientpositive/llap/llap_acid2.q.out ql/src/test/results/clientpositive/llap/llap_acid2.q.out new file mode 100644 index 0000000000..d2b8b45811 --- /dev/null +++ ql/src/test/results/clientpositive/llap/llap_acid2.q.out @@ -0,0 +1,392 @@ +PREHOOK: query: DROP TABLE orc_llap +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE orc_llap +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE orc_llap ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cint0 INT, + cbigint0 BIGINT, + cfloat0 FLOAT, + cdouble0 DOUBLE, + cint1 INT, + cbigint1 BIGINT, + cfloat1 FLOAT, + cdouble1 DOUBLE, + cstring1 string, + cfloat2 float +) stored as orc TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_llap +POSTHOOK: query: CREATE TABLE orc_llap ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cint0 INT, + cbigint0 BIGINT, + cfloat0 FLOAT, + cdouble0 DOUBLE, + cint1 INT, + cbigint1 BIGINT, + cfloat1 FLOAT, + cdouble1 DOUBLE, + cstring1 string, + cfloat2 float +) stored as orc TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_llap +PREHOOK: query: insert into table orc_llap +select cint, cbigint, cfloat, cdouble, + cint as c1, cbigint as c2, cfloat as c3, cdouble as c4, + cint as c8, cbigint as c7, cfloat as c6, cdouble as c5, + cstring1, cfloat as c9 from alltypesorc order by cdouble asc limit 30 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap +POSTHOOK: query: insert into table orc_llap +select cint, cbigint, cfloat, cdouble, + cint as c1, cbigint as c2, cfloat as c3, cdouble as c4, + cint as c8, cbigint as c7, cfloat as c6, cdouble as c5, + cstring1, cfloat as c9 from alltypesorc order by cdouble asc limit 30 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap +POSTHOOK: Lineage: orc_llap.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap.cbigint0 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap.cbigint1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap.cdouble0 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap.cdouble1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap.cfloat0 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap.cfloat1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap.cfloat2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap.cint0 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap.cint1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +PREHOOK: query: CREATE TABLE orc_llap2 ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cint0 INT, + cbigint0 BIGINT, + cfloat0 FLOAT, + cdouble0 DOUBLE, + cint1 INT, + cbigint1 BIGINT, + cfloat1 FLOAT, + cdouble1 DOUBLE, + cstring1 string, + cfloat2 float +) stored as orc TBLPROPERTIES ('transactional'='false') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_llap2 +POSTHOOK: query: CREATE TABLE orc_llap2 ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE, + cint0 INT, + cbigint0 BIGINT, + cfloat0 FLOAT, + cdouble0 DOUBLE, + cint1 INT, + cbigint1 BIGINT, + cfloat1 FLOAT, + cdouble1 DOUBLE, + cstring1 string, + cfloat2 float +) stored as orc TBLPROPERTIES ('transactional'='false') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_llap2 +PREHOOK: query: insert into table orc_llap2 +select cint, cbigint, cfloat, cdouble, + cint as c1, cbigint as c2, cfloat as c3, cdouble as c4, + cint as c8, cbigint as c7, cfloat as c6, cdouble as c5, + cstring1, cfloat as c9 from alltypesorc order by cdouble asc limit 30 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap2 +POSTHOOK: query: insert into table orc_llap2 +select cint, cbigint, cfloat, cdouble, + cint as c1, cbigint as c2, cfloat as c3, cdouble as c4, + cint as c8, cbigint as c7, cfloat as c6, cdouble as c5, + cstring1, cfloat as c9 from alltypesorc order by cdouble asc limit 30 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap2 +POSTHOOK: Lineage: orc_llap2.cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cbigint0 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cbigint1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cdouble0 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cdouble1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cfloat0 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cfloat1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cfloat2 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cint0 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cint1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +POSTHOOK: Lineage: orc_llap2.cstring1 SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cstring1, type:string, comment:null), ] +PREHOOK: query: alter table orc_llap2 set TBLPROPERTIES ('transactional'='true') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@orc_llap2 +PREHOOK: Output: default@orc_llap2 +POSTHOOK: query: alter table orc_llap2 set TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@orc_llap2 +POSTHOOK: Output: default@orc_llap2 +PREHOOK: query: update orc_llap2 set cstring1 = 'testvalue' where cstring1 = 'N016jPED08o' +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap2 +PREHOOK: Output: default@orc_llap2 +POSTHOOK: query: update orc_llap2 set cstring1 = 'testvalue' where cstring1 = 'N016jPED08o' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap2 +POSTHOOK: Output: default@orc_llap2 +PREHOOK: query: select cstring1 from orc_llap +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select cstring1 from orc_llap +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +N016jPED08o +Q1JAdUlCVORmR0Q5X5Vf5u6 +eNsh5tYa +5j7GJ8OCXgMVIcK7 +uJGHsW3cd073NGFITyQ +G1u0pUmU6ehCm +mk6lShdOa8kXT8i7mLd3fK +u5C7glqT5XqtO0JE2686lk1 +h4omSc1jcLLwW +tFY2ng51v +vmAT10eeE47fgH20pLi +uN803aW +qqbDw46IgGds4 +32v414p63Jv1B4tO1xy +73xdw4X +d3o1712a03n20qvi62U7 +eQ80MW0h728I204P87YXc +KHtD2A2hp6OjFgS73gdgE +nI30tm7U55O0gI +LSJtFA66 +mby00c +meGb5 +pM6Gt05s1YJeii +LR2AKy0dPt8vFdIV5760jriw +1B3WMD5LSk65B2Moa +xTlDv24JYv4s +28Oe6r21yux7Lk47 +7wH3hBKdO55Xq3gEEe0 +5QLs0LVK1g +ET3d4F2I4lV +PREHOOK: query: select cfloat2, cint from orc_llap +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select cfloat2, cint from orc_llap +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +NULL -838810013 +NULL 246423894 +NULL 708885482 +NULL 186967185 +NULL -595277064 +NULL 584923170 +NULL 518213127 +NULL -334595454 +NULL 241008004 +NULL 185212032 +NULL -738747840 +NULL -971543377 +NULL 940448896 +NULL -324030556 +NULL -899422227 +11.0 835111400 +11.0 -775326158 +11.0 653630202 +11.0 779427499 +11.0 797003983 +11.0 31832752 +11.0 783790031 +11.0 -898241885 +11.0 NULL +11.0 -646295381 +11.0 130912195 +11.0 -391573084 +11.0 385623629 +11.0 681126962 +11.0 25892751 +PREHOOK: query: select * from orc_llap +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +#### A masked pattern was here #### +POSTHOOK: query: select * from orc_llap +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +#### A masked pattern was here #### +-838810013 1864027286 NULL NULL -838810013 1864027286 NULL NULL -838810013 1864027286 NULL NULL N016jPED08o NULL +246423894 -1645852809 NULL NULL 246423894 -1645852809 NULL NULL 246423894 -1645852809 NULL NULL Q1JAdUlCVORmR0Q5X5Vf5u6 NULL +708885482 -1645852809 NULL NULL 708885482 -1645852809 NULL NULL 708885482 -1645852809 NULL NULL eNsh5tYa NULL +186967185 -1645852809 NULL NULL 186967185 -1645852809 NULL NULL 186967185 -1645852809 NULL NULL 5j7GJ8OCXgMVIcK7 NULL +-595277064 -1645852809 NULL NULL -595277064 -1645852809 NULL NULL -595277064 -1645852809 NULL NULL uJGHsW3cd073NGFITyQ NULL +584923170 -1645852809 NULL NULL 584923170 -1645852809 NULL NULL 584923170 -1645852809 NULL NULL G1u0pUmU6ehCm NULL +518213127 -1645852809 NULL NULL 518213127 -1645852809 NULL NULL 518213127 -1645852809 NULL NULL mk6lShdOa8kXT8i7mLd3fK NULL +-334595454 -1645852809 NULL NULL -334595454 -1645852809 NULL NULL -334595454 -1645852809 NULL NULL u5C7glqT5XqtO0JE2686lk1 NULL +241008004 -1645852809 NULL NULL 241008004 -1645852809 NULL NULL 241008004 -1645852809 NULL NULL h4omSc1jcLLwW NULL +185212032 -1645852809 NULL NULL 185212032 -1645852809 NULL NULL 185212032 -1645852809 NULL NULL tFY2ng51v NULL +-738747840 -1645852809 NULL NULL -738747840 -1645852809 NULL NULL -738747840 -1645852809 NULL NULL vmAT10eeE47fgH20pLi NULL +-971543377 -1645852809 NULL NULL -971543377 -1645852809 NULL NULL -971543377 -1645852809 NULL NULL uN803aW NULL +940448896 -1645852809 NULL NULL 940448896 -1645852809 NULL NULL 940448896 -1645852809 NULL NULL qqbDw46IgGds4 NULL +-324030556 -1645852809 NULL NULL -324030556 -1645852809 NULL NULL -324030556 -1645852809 NULL NULL 32v414p63Jv1B4tO1xy NULL +-899422227 -1645852809 NULL NULL -899422227 -1645852809 NULL NULL -899422227 -1645852809 NULL NULL 73xdw4X NULL +835111400 1964238982 11.0 NULL 835111400 1964238982 11.0 NULL 835111400 1964238982 11.0 NULL d3o1712a03n20qvi62U7 11.0 +-775326158 -1289793978 11.0 NULL -775326158 -1289793978 11.0 NULL -775326158 -1289793978 11.0 NULL eQ80MW0h728I204P87YXc 11.0 +653630202 1281184487 11.0 NULL 653630202 1281184487 11.0 NULL 653630202 1281184487 11.0 NULL KHtD2A2hp6OjFgS73gdgE 11.0 +779427499 1326393090 11.0 NULL 779427499 1326393090 11.0 NULL 779427499 1326393090 11.0 NULL nI30tm7U55O0gI 11.0 +797003983 1186689849 11.0 NULL 797003983 1186689849 11.0 NULL 797003983 1186689849 11.0 NULL LSJtFA66 11.0 +31832752 1854212271 11.0 NULL 31832752 1854212271 11.0 NULL 31832752 1854212271 11.0 NULL mby00c 11.0 +783790031 -1482854823 11.0 NULL 783790031 -1482854823 11.0 NULL 783790031 -1482854823 11.0 NULL meGb5 11.0 +-898241885 -1785664982 11.0 NULL -898241885 -1785664982 11.0 NULL -898241885 -1785664982 11.0 NULL pM6Gt05s1YJeii 11.0 +NULL -1083386085 11.0 NULL NULL -1083386085 11.0 NULL NULL -1083386085 11.0 NULL LR2AKy0dPt8vFdIV5760jriw 11.0 +-646295381 -1654635859 11.0 NULL -646295381 -1654635859 11.0 NULL -646295381 -1654635859 11.0 NULL 1B3WMD5LSk65B2Moa 11.0 +130912195 -1286145901 11.0 NULL 130912195 -1286145901 11.0 NULL 130912195 -1286145901 11.0 NULL xTlDv24JYv4s 11.0 +-391573084 -236100834 11.0 NULL -391573084 -236100834 11.0 NULL -391573084 -236100834 11.0 NULL 28Oe6r21yux7Lk47 11.0 +385623629 236101682 11.0 NULL 385623629 236101682 11.0 NULL 385623629 236101682 11.0 NULL 7wH3hBKdO55Xq3gEEe0 11.0 +681126962 993392163 11.0 NULL 681126962 993392163 11.0 NULL 681126962 993392163 11.0 NULL 5QLs0LVK1g 11.0 +25892751 -1978674520 11.0 NULL 25892751 -1978674520 11.0 NULL 25892751 -1978674520 11.0 NULL ET3d4F2I4lV 11.0 +PREHOOK: query: select cstring1 from orc_llap2 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap2 +#### A masked pattern was here #### +POSTHOOK: query: select cstring1 from orc_llap2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap2 +#### A masked pattern was here #### +Q1JAdUlCVORmR0Q5X5Vf5u6 +eNsh5tYa +5j7GJ8OCXgMVIcK7 +uJGHsW3cd073NGFITyQ +G1u0pUmU6ehCm +mk6lShdOa8kXT8i7mLd3fK +u5C7glqT5XqtO0JE2686lk1 +h4omSc1jcLLwW +tFY2ng51v +vmAT10eeE47fgH20pLi +uN803aW +qqbDw46IgGds4 +32v414p63Jv1B4tO1xy +73xdw4X +d3o1712a03n20qvi62U7 +eQ80MW0h728I204P87YXc +KHtD2A2hp6OjFgS73gdgE +nI30tm7U55O0gI +LSJtFA66 +mby00c +meGb5 +pM6Gt05s1YJeii +LR2AKy0dPt8vFdIV5760jriw +1B3WMD5LSk65B2Moa +xTlDv24JYv4s +28Oe6r21yux7Lk47 +7wH3hBKdO55Xq3gEEe0 +5QLs0LVK1g +ET3d4F2I4lV +testvalue +PREHOOK: query: select cfloat2, cint from orc_llap2 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap2 +#### A masked pattern was here #### +POSTHOOK: query: select cfloat2, cint from orc_llap2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap2 +#### A masked pattern was here #### +NULL 246423894 +NULL 708885482 +NULL 186967185 +NULL -595277064 +NULL 584923170 +NULL 518213127 +NULL -334595454 +NULL 241008004 +NULL 185212032 +NULL -738747840 +NULL -971543377 +NULL 940448896 +NULL -324030556 +NULL -899422227 +11.0 835111400 +11.0 -775326158 +11.0 653630202 +11.0 779427499 +11.0 797003983 +11.0 31832752 +11.0 783790031 +11.0 -898241885 +11.0 NULL +11.0 -646295381 +11.0 130912195 +11.0 -391573084 +11.0 385623629 +11.0 681126962 +11.0 25892751 +NULL -838810013 +PREHOOK: query: select * from orc_llap2 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap2 +#### A masked pattern was here #### +POSTHOOK: query: select * from orc_llap2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap2 +#### A masked pattern was here #### +246423894 -1645852809 NULL NULL 246423894 -1645852809 NULL NULL 246423894 -1645852809 NULL NULL Q1JAdUlCVORmR0Q5X5Vf5u6 NULL +708885482 -1645852809 NULL NULL 708885482 -1645852809 NULL NULL 708885482 -1645852809 NULL NULL eNsh5tYa NULL +186967185 -1645852809 NULL NULL 186967185 -1645852809 NULL NULL 186967185 -1645852809 NULL NULL 5j7GJ8OCXgMVIcK7 NULL +-595277064 -1645852809 NULL NULL -595277064 -1645852809 NULL NULL -595277064 -1645852809 NULL NULL uJGHsW3cd073NGFITyQ NULL +584923170 -1645852809 NULL NULL 584923170 -1645852809 NULL NULL 584923170 -1645852809 NULL NULL G1u0pUmU6ehCm NULL +518213127 -1645852809 NULL NULL 518213127 -1645852809 NULL NULL 518213127 -1645852809 NULL NULL mk6lShdOa8kXT8i7mLd3fK NULL +-334595454 -1645852809 NULL NULL -334595454 -1645852809 NULL NULL -334595454 -1645852809 NULL NULL u5C7glqT5XqtO0JE2686lk1 NULL +241008004 -1645852809 NULL NULL 241008004 -1645852809 NULL NULL 241008004 -1645852809 NULL NULL h4omSc1jcLLwW NULL +185212032 -1645852809 NULL NULL 185212032 -1645852809 NULL NULL 185212032 -1645852809 NULL NULL tFY2ng51v NULL +-738747840 -1645852809 NULL NULL -738747840 -1645852809 NULL NULL -738747840 -1645852809 NULL NULL vmAT10eeE47fgH20pLi NULL +-971543377 -1645852809 NULL NULL -971543377 -1645852809 NULL NULL -971543377 -1645852809 NULL NULL uN803aW NULL +940448896 -1645852809 NULL NULL 940448896 -1645852809 NULL NULL 940448896 -1645852809 NULL NULL qqbDw46IgGds4 NULL +-324030556 -1645852809 NULL NULL -324030556 -1645852809 NULL NULL -324030556 -1645852809 NULL NULL 32v414p63Jv1B4tO1xy NULL +-899422227 -1645852809 NULL NULL -899422227 -1645852809 NULL NULL -899422227 -1645852809 NULL NULL 73xdw4X NULL +835111400 1964238982 11.0 NULL 835111400 1964238982 11.0 NULL 835111400 1964238982 11.0 NULL d3o1712a03n20qvi62U7 11.0 +-775326158 -1289793978 11.0 NULL -775326158 -1289793978 11.0 NULL -775326158 -1289793978 11.0 NULL eQ80MW0h728I204P87YXc 11.0 +653630202 1281184487 11.0 NULL 653630202 1281184487 11.0 NULL 653630202 1281184487 11.0 NULL KHtD2A2hp6OjFgS73gdgE 11.0 +779427499 1326393090 11.0 NULL 779427499 1326393090 11.0 NULL 779427499 1326393090 11.0 NULL nI30tm7U55O0gI 11.0 +797003983 1186689849 11.0 NULL 797003983 1186689849 11.0 NULL 797003983 1186689849 11.0 NULL LSJtFA66 11.0 +31832752 1854212271 11.0 NULL 31832752 1854212271 11.0 NULL 31832752 1854212271 11.0 NULL mby00c 11.0 +783790031 -1482854823 11.0 NULL 783790031 -1482854823 11.0 NULL 783790031 -1482854823 11.0 NULL meGb5 11.0 +-898241885 -1785664982 11.0 NULL -898241885 -1785664982 11.0 NULL -898241885 -1785664982 11.0 NULL pM6Gt05s1YJeii 11.0 +NULL -1083386085 11.0 NULL NULL -1083386085 11.0 NULL NULL -1083386085 11.0 NULL LR2AKy0dPt8vFdIV5760jriw 11.0 +-646295381 -1654635859 11.0 NULL -646295381 -1654635859 11.0 NULL -646295381 -1654635859 11.0 NULL 1B3WMD5LSk65B2Moa 11.0 +130912195 -1286145901 11.0 NULL 130912195 -1286145901 11.0 NULL 130912195 -1286145901 11.0 NULL xTlDv24JYv4s 11.0 +-391573084 -236100834 11.0 NULL -391573084 -236100834 11.0 NULL -391573084 -236100834 11.0 NULL 28Oe6r21yux7Lk47 11.0 +385623629 236101682 11.0 NULL 385623629 236101682 11.0 NULL 385623629 236101682 11.0 NULL 7wH3hBKdO55Xq3gEEe0 11.0 +681126962 993392163 11.0 NULL 681126962 993392163 11.0 NULL 681126962 993392163 11.0 NULL 5QLs0LVK1g 11.0 +25892751 -1978674520 11.0 NULL 25892751 -1978674520 11.0 NULL 25892751 -1978674520 11.0 NULL ET3d4F2I4lV 11.0 +-838810013 1864027286 NULL NULL -838810013 1864027286 NULL NULL -838810013 1864027286 NULL NULL testvalue NULL +PREHOOK: query: DROP TABLE orc_llap +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@orc_llap +PREHOOK: Output: default@orc_llap +POSTHOOK: query: DROP TABLE orc_llap +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@orc_llap +POSTHOOK: Output: default@orc_llap diff --git storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java index aa2615060a..29a3b0f2f4 100644 --- storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java +++ storage-api/src/java/org/apache/hadoop/hive/common/io/encoded/EncodedColumnBatch.java @@ -97,7 +97,10 @@ public String toString() { * For each column, streams are indexed by kind (for ORC), with missing elements being null. */ protected ColumnStreamData[][] columnData; - /** Indicates which columns have data. Correspond to columnData elements. */ + /** + * Indicates which columns have data. This is indexed by the column ID in ORC file schema; + * the indices that are not included will not have data. Correspond to columnData elements. + */ protected boolean[] hasData; public void reset() { @@ -143,9 +146,9 @@ public int getTotalColCount() { protected void resetColumnArrays(int columnCount) { if (hasData != null && columnCount == hasData.length) { Arrays.fill(hasData, false); - return; + } else { + hasData = new boolean[columnCount]; } - hasData = new boolean[columnCount]; ColumnStreamData[][] columnData = new ColumnStreamData[columnCount][]; if (this.columnData != null) { for (int i = 0; i < Math.min(columnData.length, this.columnData.length); ++i) {