diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 32a6418552..0edd777e32 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -189,6 +189,8 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ join_nullsafe.q,\ leftsemijoin.q,\ limit_pushdown.q,\ + llap_acid.q,\ + llap_acid_fast.q,\ load_dyn_part1.q,\ load_dyn_part2.q,\ load_dyn_part3.q,\ diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 79ec4edd50..1cf5f49bca 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -138,7 +138,7 @@ InputSplit split, JobConf job, Reporter reporter) throws IOException { boolean useLlapIo = true; if (split instanceof LlapAwareSplit) { - useLlapIo = ((LlapAwareSplit) split).canUseLlapIo(); + useLlapIo = ((LlapAwareSplit) split).canUseLlapIo(job); } if (useLlapIo) return null; @@ -170,9 +170,14 @@ static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveExcept RowSchema rowSchema = findTsOp(mapWork).getSchema(); final List colNames = new ArrayList(rowSchema.getSignature().size()); final List colTypes = new ArrayList(rowSchema.getSignature().size()); + boolean hasRowId = false; for (ColumnInfo c : rowSchema.getSignature()) { String columnName = c.getInternalName(); - if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(columnName)) continue; + if (VirtualColumn.ROWID.getName().equals(columnName)) { + hasRowId = true; + } else { + if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(columnName)) continue; + } colNames.add(columnName); colTypes.add(TypeInfoUtils.getTypeInfoFromTypeString(c.getTypeName())); } @@ -190,10 +195,15 @@ static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveExcept } } } - // UNDONE: Virtual column support? + final VirtualColumn[] virtualColumns; + if (hasRowId) { + virtualColumns = new VirtualColumn[] {VirtualColumn.ROWID}; + } else { + virtualColumns = new VirtualColumn[0]; + } return new VectorizedRowBatchCtx(colNames.toArray(new String[colNames.size()]), colTypes.toArray(new TypeInfo[colTypes.size()]), null, partitionColumnCount, - new VirtualColumn[0], new String[0]); + virtualColumns, new String[0]); } static TableScanOperator findTsOp(MapWork mapWork) throws HiveException { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index 5d93884e92..ad2d8e7ba2 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -164,7 +164,7 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, // Create the consumer of encoded data; it will coordinate decoding to CVBs. feedback = rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, counters, schema, sourceInputFormat, sourceSerDe, reporter, job, - mapWork.getPathToPartitionInfo()); + mapWork.getPathToPartitionInfo(), rbCtx); } private static MapWork findMapWork(JobConf job) throws HiveException { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java index d08dfbbe98..48db8141b8 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -43,5 +44,6 @@ ReadPipeline createReadPipeline(Consumer consumer, FileSplit List columnIds, SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters, TypeDescription readerSchema, InputFormat sourceInputFormat, Deserializer sourceSerDe, Reporter reporter, - JobConf job, Map parts) throws IOException; + JobConf job, Map parts, VectorizedRowBatchCtx rowBatchCtx) + throws IOException; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java index 945aff31b3..6ee814b217 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java @@ -83,8 +83,8 @@ public GenericColumnVectorProducer(SerDeLowLevelCacheImpl serdeCache, public ReadPipeline createReadPipeline(Consumer consumer, FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters, TypeDescription schema, InputFormat sourceInputFormat, - Deserializer sourceSerDe, Reporter reporter, JobConf job, Map parts) - throws IOException { + Deserializer sourceSerDe, Reporter reporter, JobConf job, Map parts, + VectorizedRowBatchCtx rowBatchCtx) throws IOException { cacheMetrics.incrCacheReadRequests(); OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer( consumer, columnIds.size(), false, counters, ioMetrics); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 373af76cf6..b940be119e 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -19,14 +19,13 @@ package org.apache.hadoop.hive.llap.io.decode; import java.io.IOException; +import java.util.BitSet; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.Pool; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.cache.BufferUsageManager; import org.apache.hadoop.hive.llap.cache.LowLevelCache; import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; @@ -36,7 +35,13 @@ import org.apache.hadoop.hive.llap.io.metadata.MetadataCache; import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; +import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcAcidRowBatchReader; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; @@ -44,13 +49,17 @@ import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.hive.common.util.FixedSizedObjectPool; import org.apache.orc.TypeDescription; import org.apache.orc.OrcConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class OrcColumnVectorProducer implements ColumnVectorProducer { + private static final Logger LOG = LoggerFactory.getLogger(OrcColumnVectorProducer.class); private final MetadataCache metadataCache; private final LowLevelCache lowLevelCache; @@ -84,14 +93,248 @@ public ReadPipeline createReadPipeline( Consumer consumer, FileSplit split, List columnIds, SearchArgument sarg, String[] columnNames, QueryFragmentCounters counters, TypeDescription readerSchema, InputFormat unused0, Deserializer unused1, - Reporter reporter, JobConf job, Map unused2) throws IOException { + Reporter reporter, JobConf job, Map unused2, + VectorizedRowBatchCtx rowBatchCtx) throws IOException { cacheMetrics.incrCacheReadRequests(); - OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(), - _skipCorrupt, counters, ioMetrics); - OrcEncodedDataReader reader = new OrcEncodedDataReader( - lowLevelCache, bufferManager, metadataCache, conf, job, split, columnIds, sarg, - columnNames, edc, counters, readerSchema, tracePool); + + final boolean isAcidRead = HiveConf.getBoolVar(job, + HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); + + final boolean isOriginal; + final boolean hasBase; + final boolean hasDelta; + + if (split instanceof OrcSplit) { + final OrcSplit orcSplit = (OrcSplit) split; + isOriginal = orcSplit.isOriginal(); + hasBase = orcSplit.hasBase(); + hasDelta = orcSplit.getDeltas() != null && !orcSplit.getDeltas().isEmpty(); + } else { + isOriginal = true; + hasBase = false; + hasDelta = false; + } + + final AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(job); + final boolean isSplitUpdate = acidOperationalProperties.isSplitUpdate(); + OrcEncodedDataConsumer edc = null; + OrcEncodedDataReader reader = null; + + if (isOriginal) { + if (!isAcidRead && !hasDelta) { + LOG.debug("Original only"); + edc = new OrcEncodedDataConsumer(consumer, columnIds.size(), _skipCorrupt, counters, + ioMetrics); + reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, metadataCache, conf, job, + split, columnIds, sarg, columnNames, edc, counters, readerSchema, tracePool); + } + } else { + if (hasBase) { + if (hasDelta) { + if (isSplitUpdate) { + LOG.debug("Base with delete deltas"); + consumer = new AcidConsumer(new ProjectionConsumer( + consumer, columnIds), split, job, reporter, rowBatchCtx); + edc = new OrcEncodedDataConsumer(consumer, OrcRecordUpdater.FIELDS, _skipCorrupt, + counters, ioMetrics); + reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, metadataCache, conf, + job, split, columnIds, sarg, columnNames, edc, counters, readerSchema, tracePool); + } + } else { + LOG.debug("Base only"); + consumer = new AcidToNonAcidConsumer(new ProjectionConsumer( + consumer, columnIds), columnIds.size(), rowBatchCtx); + edc = new OrcEncodedDataConsumer(consumer, OrcRecordUpdater.FIELDS, _skipCorrupt, + counters, ioMetrics); + reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, metadataCache, conf, job, + split, columnIds, sarg, columnNames, edc, counters, readerSchema, tracePool); + } + } + } edc.init(reader, reader, reader.getTrace()); return edc; } + + /** + * CvbAcidReader works as VectorizedOrcAcidRowBatchReader does, except it handles + * ColumnVectorBatch instead of VectorizedRowBatch + */ + private static class CvbAcidReader extends VectorizedOrcAcidRowBatchReader { + CvbAcidReader(InputSplit inputSplit, JobConf conf, Reporter reporter, + VectorizedRowBatchCtx rbCtx) throws IOException { + super(inputSplit, conf, reporter, rbCtx); + } + + boolean merge(ColumnVectorBatch input, ColumnVectorBatch output) throws IOException { + try { + // Check and update partition cols if necessary. Ideally, this should be done + // in CreateValue as the partition is constant per split. But since Hive uses + // CombineHiveRecordReader and + // as this does not call CreateValue for each new RecordReader it creates, this check is + // required in next() + if (addPartitionCols) { + if (partitionValues != null) { + rbCtx.addPartitionColsToBatch(output.cols, partitionValues); + } + addPartitionCols = false; + } + } catch (Exception e) { + throw new IOException("error iterating", e); + } + + // Once we have read the VectorizedRowBatchBase from the file, there are two kinds of cases + // for which we might have to discard rows from the batch: + // Case 1- when the row is created by a transaction that is not valid, or + // Case 2- when the row has been deleted. + // We will go through the batch to discover rows which match any of the cases and specifically + // remove them from the selected vector. Of course, selectedInUse should also be set. + + BitSet selectedBitSet = new BitSet(input.size); + selectedBitSet.set(0, input.size, true); + + // Case 1- find rows which belong to transactions that are not valid. + findRecordsWithInvalidTransactionIds(input.cols, input.size, selectedBitSet); + + // Case 2- find rows which have been deleted. + this.deleteEventRegistry.findDeletedRecords(input.cols, input.size, selectedBitSet); + + if (selectedBitSet.cardinality() == input.size) { + output.size = input.size; + } else { + output.size = selectedBitSet.cardinality(); + // This loop fills up the selected[] vector with all the index positions that are selected. + for (int setBitIndex = selectedBitSet.nextSetBit(0), selectedItr = 0; + setBitIndex >= 0; + setBitIndex = selectedBitSet.nextSetBit(setBitIndex+1), ++selectedItr) { + for (int i = 0; i < input.cols.length; i++) { + output.cols[i].setElement(selectedItr, setBitIndex, input.cols[i]); + } + } + } + + // Finally, link up the columnVector from the base VectorizedRowBatch to outgoing batch. + // NOTE: We only link up the user columns and not the ACID metadata columns because this + // vectorized code path is not being used in cases of update/delete, when the metadata columns + // would be expected to be passed up the operator pipeline. This is because + // currently the update/delete specifically disable vectorized code paths. + // This happens at ql/exec/Utilities.java::3293 when it checks for mapWork.getVectorMode() + StructColumnVector payloadStruct = (StructColumnVector) input.cols[OrcRecordUpdater.ROW]; + // Transfer columnVector objects from base batch to outgoing batch. + System.arraycopy(payloadStruct.fields, 0, output.cols, 0, rbCtx.getDataColumnCount()); + if (rbCtx != null) { + recordIdColumnVector.fields[0] = input.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]; + recordIdColumnVector.fields[1] = input.cols[OrcRecordUpdater.BUCKET]; + recordIdColumnVector.fields[2] = input.cols[OrcRecordUpdater.ROW_ID]; + rbCtx.setRecordIdColumnVector(recordIdColumnVector); + } + return true; + } + } + + private static abstract class AbstractConsumer implements Consumer { + protected final Consumer consumer; + + AbstractConsumer(Consumer consumer) { + this.consumer = consumer; + } + + @Override + public void setDone() { + consumer.setDone(); + } + + @Override + public void setError(Throwable t) { + consumer.setError(t); + } + } + + /** + * Handles split updates in ColumnVectorBatch + */ + private static class AcidConsumer extends AbstractConsumer { + private final CvbAcidReader acidReader; + private final ColumnVectorBatch output; + + AcidConsumer(Consumer consumer, InputSplit split, JobConf job, + Reporter reporter, VectorizedRowBatchCtx rowBatchCtx) throws IOException { + super(consumer); + this.acidReader = new CvbAcidReader(split, job, reporter, rowBatchCtx); + final VectorizedRowBatch vrb = rowBatchCtx.createVectorizedRowBatch(); + this.output = new ColumnVectorBatch(rowBatchCtx.getRowColumnTypeInfos().length); + this.output.cols = vrb.cols; + } + + @Override + public void consumeData(ColumnVectorBatch input) { + try { + acidReader.merge(input, output); + // Pass to its next consumer + consumer.consumeData(output); + } catch (IOException e) { + consumer.setError(e); + throw new RuntimeException(e); + } + } + } + + /** + * Converts an ACID ColumnVectorBatch to a non-ACID ColumnVectorBatch + */ + private static class AcidToNonAcidConsumer extends AbstractConsumer { + private final ColumnVectorBatch output; + private final VectorizedRowBatchCtx rowBatchCtx; + private final StructColumnVector recordIdColumnVector; + + AcidToNonAcidConsumer(Consumer consumer, int numColumn, + VectorizedRowBatchCtx rowBatchCtx) { + super(consumer); + this.output = new ColumnVectorBatch(numColumn); + this.rowBatchCtx = rowBatchCtx; + this.recordIdColumnVector = + new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null); + } + + @Override + public void consumeData(ColumnVectorBatch data) { + // Copy ACID columns + recordIdColumnVector.fields[0] = data.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]; + recordIdColumnVector.fields[1] = data.cols[OrcRecordUpdater.BUCKET]; + recordIdColumnVector.fields[2] = data.cols[OrcRecordUpdater.ROW_ID]; + rowBatchCtx.setRecordIdColumnVector(recordIdColumnVector); + + final StructColumnVector row = (StructColumnVector) data.cols[OrcRecordUpdater.ROW]; + output.size = data.size; + output.cols = row.fields; + consumer.consumeData(output); + } + } + + /** + * Projects selected columns only + */ + private static class ProjectionConsumer extends AbstractConsumer { + private final List columnIds; + private final ColumnVectorBatch output; + + ProjectionConsumer(Consumer consumer, List columnIds) { + super(consumer); + this.columnIds = columnIds; + this.output = new ColumnVectorBatch(columnIds.size()); + } + + @Override + public void consumeData(ColumnVectorBatch data) { + // Copy only selected columns + output.size = data.size; + final int size = columnIds.size(); + for (int i = 0; i < size; i++) { + output.cols[i] = data.cols[columnIds.get(i)]; + } + + // Pass the result to its next consumer + consumer.consumeData(output); + } + } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index d048a57651..1b1f0b68f4 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -154,7 +154,8 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, if (cvb.cols[idx] == null) { // Orc store rows inside a root struct (hive writes it this way). // When we populate column vectors we skip over the root struct. - cvb.cols[idx] = createColumn(schema.getChildren().get(columnMapping[idx]), batchSize); + cvb.cols[idx] = createColumn(schema.getChildren().get(columnMapping[idx]), + VectorizedRowBatch.DEFAULT_SIZE); } trace.logTreeReaderNextVector(idx); cvb.cols[idx].ensureSize(batchSize, false); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 2e47a56169..9e6efa4909 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; +import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.orc.CompressionCodec; import org.apache.orc.OrcProto.BloomFilterIndex; import org.apache.orc.OrcProto.FileTail; @@ -225,7 +226,18 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff if (readerSchema == null) { readerSchema = fileMetadata.getSchema(); } - globalIncludes = OrcInputFormat.genIncludedColumns(readerSchema, includedColumnIds); + final boolean[] readerIncludes = OrcInputFormat.genIncludedColumns(readerSchema, includedColumnIds); + if (split instanceof OrcSplit) { + final OrcSplit orcSplit = (OrcSplit) split; + final List deltas = orcSplit.getDeltas(); + if (orcSplit.isOriginal() && (deltas == null || deltas.isEmpty())) { + globalIncludes = readerIncludes; + } else { + globalIncludes = OrcInputFormat.shiftReaderIncludedForAcid(readerIncludes); + } + } else { + globalIncludes = readerIncludes; + } // Do not allow users to override zero-copy setting. The rest can be taken from user config. boolean useZeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(daemonConf); if (useZeroCopy != OrcConf.USE_ZEROCOPY.getBoolean(jobConf)) { @@ -233,7 +245,7 @@ public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager buff jobConf.setBoolean(OrcConf.USE_ZEROCOPY.getAttribute(), useZeroCopy); } this.jobConf = jobConf; - Reader.Options options = new Reader.Options(jobConf).include(globalIncludes); + Reader.Options options = new Reader.Options(jobConf).include(readerIncludes); evolution = new SchemaEvolution(fileMetadata.getSchema(), readerSchema, options); consumer.setFileMetadata(fileMetadata); consumer.setIncludedColumns(globalIncludes); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java index f4499d7ff1..a5bdbef56c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExpressionDescriptor.java @@ -76,6 +76,7 @@ INTERVAL_YEAR_MONTH (0x100), INTERVAL_DAY_TIME (0x200), BINARY (0x400), + STRUCT (0x800), DATETIME_FAMILY (DATE.value | TIMESTAMP.value), INTERVAL_FAMILY (INTERVAL_YEAR_MONTH.value | INTERVAL_DAY_TIME.value), INT_INTERVAL_YEAR_MONTH (INT_FAMILY.value | INTERVAL_YEAR_MONTH.value), @@ -122,6 +123,8 @@ public static ArgumentType fromHiveTypeName(String hiveTypeName) { return INTERVAL_YEAR_MONTH; } else if (lower.equals(serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME)) { return INTERVAL_DAY_TIME; + } else if (VectorizationContext.structTypePattern.matcher(lower).matches()) { + return STRUCT; } else if (lower.equals("void")) { // The old code let void through... return INT_FAMILY; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java index 23fdaa554f..fba17a8d7c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java @@ -188,7 +188,7 @@ private Object extractRowColumn(VectorizedRowBatch batch, int batchIndex, int lo colVector, typeInfos[logicalColumnIndex], objectInspectors[logicalColumnIndex], batchIndex); } - Object extractRowColumn( + public Object extractRowColumn( ColumnVector colVector, TypeInfo typeInfo, ObjectInspector objectInspector, int batchIndex) { if (colVector == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index e8c73a944a..26ab360f76 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -818,12 +818,11 @@ public void process(Writable value) throws HiveException { VectorizedRowBatch batch = (VectorizedRowBatch) value; numRows += batch.size; if (hasRowIdentifier) { - - // UNDONE: Pass ROW__ID STRUCT column through IO Context to get filled in by ACID reader - // UNDONE: Or, perhaps tell it to do it before calling us, ... - // UNDONE: For now, set column to NULL. - - setRowIdentiferToNull(batch); + if (batchContext.getRecordIdColumnVector() == null) { + setRowIdentiferToNull(batch); + } else { + batch.cols[rowIdentifierColumnNum] = batchContext.getRecordIdColumnVector(); + } } } oneRootOperator.process(value, 0); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index fcebb6fbce..47d93331f7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -347,6 +347,9 @@ public void addProjectionColumn(String columnName, int vectorBatchColIndex) { public static final Pattern charVarcharTypePattern = Pattern.compile("char.*|varchar.*", Pattern.CASE_INSENSITIVE); + public static final Pattern structTypePattern = Pattern.compile("struct.*", + Pattern.CASE_INSENSITIVE); + //Map column number to type private OutputColumnManager ocm; @@ -2715,6 +2718,8 @@ static String getUndecoratedName(String hiveTypeName) throws HiveException { case INTERVAL_YEAR_MONTH: case INTERVAL_DAY_TIME: return hiveTypeName; + case STRUCT: + return "Struct"; default: throw new HiveException("Unexpected hive type name " + hiveTypeName); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 90d1372aec..c48ad48047 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -81,6 +81,7 @@ private int partitionColumnCount; private int virtualColumnCount; private VirtualColumn[] neededVirtualColumns; + private StructColumnVector recordIdColumnVector; private String[] scratchColumnTypeNames; @@ -136,6 +137,14 @@ public int getVirtualColumnCount() { return scratchColumnTypeNames; } + public StructColumnVector getRecordIdColumnVector() { + return this.recordIdColumnVector; + } + + public void setRecordIdColumnVector(StructColumnVector recordIdColumnVector) { + this.recordIdColumnVector = recordIdColumnVector; + } + /** * Initializes the VectorizedRowBatch context based on an scratch column type names and * object inspector. @@ -274,6 +283,11 @@ public VectorizedRowBatch createVectorizedRowBatch() */ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partitionValues) { + addPartitionColsToBatch(batch.cols, partitionValues); + } + + public void addPartitionColsToBatch(ColumnVector[] cols, Object[] partitionValues) + { if (partitionValues != null) { for (int i = 0; i < partitionColumnCount; i++) { Object value = partitionValues[i]; @@ -283,7 +297,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) rowColumnTypeInfos[colIndex]; switch (primitiveTypeInfo.getPrimitiveCategory()) { case BOOLEAN: { - LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex]; + LongColumnVector lcv = (LongColumnVector) cols[colIndex]; if (value == null) { lcv.noNulls = false; lcv.isNull[0] = true; @@ -296,7 +310,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition break; case BYTE: { - LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex]; + LongColumnVector lcv = (LongColumnVector) cols[colIndex]; if (value == null) { lcv.noNulls = false; lcv.isNull[0] = true; @@ -309,7 +323,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition break; case SHORT: { - LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex]; + LongColumnVector lcv = (LongColumnVector) cols[colIndex]; if (value == null) { lcv.noNulls = false; lcv.isNull[0] = true; @@ -322,7 +336,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition break; case INT: { - LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex]; + LongColumnVector lcv = (LongColumnVector) cols[colIndex]; if (value == null) { lcv.noNulls = false; lcv.isNull[0] = true; @@ -335,7 +349,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition break; case LONG: { - LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex]; + LongColumnVector lcv = (LongColumnVector) cols[colIndex]; if (value == null) { lcv.noNulls = false; lcv.isNull[0] = true; @@ -348,7 +362,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition break; case DATE: { - LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex]; + LongColumnVector lcv = (LongColumnVector) cols[colIndex]; if (value == null) { lcv.noNulls = false; lcv.isNull[0] = true; @@ -361,7 +375,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition break; case TIMESTAMP: { - TimestampColumnVector lcv = (TimestampColumnVector) batch.cols[colIndex]; + TimestampColumnVector lcv = (TimestampColumnVector) cols[colIndex]; if (value == null) { lcv.noNulls = false; lcv.isNull[0] = true; @@ -374,7 +388,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition break; case INTERVAL_YEAR_MONTH: { - LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex]; + LongColumnVector lcv = (LongColumnVector) cols[colIndex]; if (value == null) { lcv.noNulls = false; lcv.isNull[0] = true; @@ -386,7 +400,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition } case INTERVAL_DAY_TIME: { - IntervalDayTimeColumnVector icv = (IntervalDayTimeColumnVector) batch.cols[colIndex]; + IntervalDayTimeColumnVector icv = (IntervalDayTimeColumnVector) cols[colIndex]; if (value == null) { icv.noNulls = false; icv.isNull[0] = true; @@ -398,7 +412,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition } case FLOAT: { - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[colIndex]; + DoubleColumnVector dcv = (DoubleColumnVector) cols[colIndex]; if (value == null) { dcv.noNulls = false; dcv.isNull[0] = true; @@ -411,7 +425,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition break; case DOUBLE: { - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[colIndex]; + DoubleColumnVector dcv = (DoubleColumnVector) cols[colIndex]; if (value == null) { dcv.noNulls = false; dcv.isNull[0] = true; @@ -424,7 +438,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition break; case DECIMAL: { - DecimalColumnVector dv = (DecimalColumnVector) batch.cols[colIndex]; + DecimalColumnVector dv = (DecimalColumnVector) cols[colIndex]; if (value == null) { dv.noNulls = false; dv.isNull[0] = true; @@ -439,7 +453,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition break; case BINARY: { - BytesColumnVector bcv = (BytesColumnVector) batch.cols[colIndex]; + BytesColumnVector bcv = (BytesColumnVector) cols[colIndex]; byte[] bytes = (byte[]) value; if (bytes == null) { bcv.noNulls = false; @@ -455,7 +469,7 @@ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partition case STRING: case CHAR: case VARCHAR: { - BytesColumnVector bcv = (BytesColumnVector) batch.cols[colIndex]; + BytesColumnVector bcv = (BytesColumnVector) cols[colIndex]; String sVal = value.toString(); if (sVal == null) { bcv.noNulls = false; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java index 1fb70f87e2..d8df5cc9be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java @@ -61,6 +61,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableShortObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Text; import org.apache.hive.common.util.DateUtils; @@ -1462,12 +1464,18 @@ public Object setValue(Object row, ColumnVector column, int columnRow) private static VectorExpressionWriter genVectorExpressionWritableStruct( SettableStructObjectInspector fieldObjInspector) throws HiveException { - return new VectorExpressionWriterMap() { + return new VectorExpressionWriterStruct() { private Object obj; + private VectorExtractRow vectorExtractRow; + private StructTypeInfo structTypeInfo; - public VectorExpressionWriter init(SettableStructObjectInspector objInspector) throws HiveException { + public VectorExpressionWriter init(SettableStructObjectInspector objInspector) + throws HiveException { super.init(objInspector); obj = initValue(null); + vectorExtractRow = new VectorExtractRow(); + structTypeInfo = (StructTypeInfo) + TypeInfoUtils.getTypeInfoFromTypeString(objInspector.getTypeName()); return this; } @@ -1477,15 +1485,43 @@ public Object initValue(Object ignored) { } @Override - public Object writeValue(ColumnVector column, int row) - throws HiveException { - throw new HiveException("Not implemented yet"); + public Object writeValue(ColumnVector column, int row) throws HiveException { + final StructColumnVector structColVector = (StructColumnVector) column; + final SettableStructObjectInspector structOI = + (SettableStructObjectInspector) this.objectInspector; + final List fields = structOI.getAllStructFieldRefs(); + final List fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + + final int fieldSize = fields.size(); + for (int i = 0; i < fieldSize; i++) { + final StructField structField = fields.get(i); + final Object value = vectorExtractRow.extractRowColumn(structColVector.fields[i], + fieldTypeInfos.get(i), structField.getFieldObjectInspector(), row); + structOI.setStructFieldData(obj, structField, value); + } + return this.obj; } @Override - public Object setValue(Object row, ColumnVector column, int columnRow) - throws HiveException { - throw new HiveException("Not implemented yet"); + public Object setValue(Object field, ColumnVector column, int row) throws HiveException { + if (null == field) { + field = initValue(null); + } + + final StructColumnVector structColVector = (StructColumnVector) column; + final SettableStructObjectInspector structOI = + (SettableStructObjectInspector) this.objectInspector; + final List fields = structOI.getAllStructFieldRefs(); + final List fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + + final int fieldSize = fields.size(); + for (int i = 0; i < fieldSize; i++) { + final StructField structField = fields.get(i); + final Object value = vectorExtractRow.extractRowColumn(structColVector.fields[i], + fieldTypeInfos.get(i), structField.getFieldObjectInspector(), row); + structOI.setStructFieldData(obj, structField, value); + } + return field; } }.init(fieldObjInspector); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/LlapAwareSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/LlapAwareSplit.java index ead4678f64..20e134541c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/LlapAwareSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/LlapAwareSplit.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.conf.Configuration; + /** * Split that is aware that it could be executed in LLAP. Allows LlapInputFormat to do * a last-minute check to see of LLAP IO pipeline should be used for this particular split. * By default, there is no such check - whatever is sent in is attempted with LLAP IO. */ public interface LlapAwareSplit { - boolean canUseLlapIo(); + boolean canUseLlapIo(Configuration conf); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index c5d9b7bb9e..b5f3eaaf70 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -1607,20 +1607,19 @@ private long computeProjectionSize(List fileTypes, } return ReaderImpl.getRawDataSizeFromColIndices(internalColIds, fileTypes, stats); } + } - private boolean[] shiftReaderIncludedForAcid(boolean[] included) { - // We always need the base row - included[0] = true; - boolean[] newIncluded = new boolean[included.length + OrcRecordUpdater.FIELDS]; - Arrays.fill(newIncluded, 0, OrcRecordUpdater.FIELDS, true); - for(int i= 0; i < included.length; ++i) { - newIncluded[i + OrcRecordUpdater.FIELDS] = included[i]; - } - return newIncluded; + public static boolean[] shiftReaderIncludedForAcid(boolean[] included) { + // We always need the base row + included[0] = true; + boolean[] newIncluded = new boolean[included.length + OrcRecordUpdater.FIELDS]; + Arrays.fill(newIncluded, 0, OrcRecordUpdater.FIELDS, true); + for (int i = 0; i < included.length; ++i) { + newIncluded[i + OrcRecordUpdater.FIELDS] = included[i]; } + return newIncluded; } - /** Class intended to update two values from methods... Java-related cruft. */ @VisibleForTesting static final class CombinedCtx { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 1e19a911a6..9dcca071e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -71,13 +71,13 @@ final static int UPDATE_OPERATION = 1; final static int DELETE_OPERATION = 2; - final static int OPERATION = 0; - final static int ORIGINAL_TRANSACTION = 1; - final static int BUCKET = 2; - final static int ROW_ID = 3; - final static int CURRENT_TRANSACTION = 4; - final static int ROW = 5; - final static int FIELDS = 6; + public final static int OPERATION = 0; + public final static int ORIGINAL_TRANSACTION = 1; + public final static int BUCKET = 2; + public final static int ROW_ID = 3; + public final static int CURRENT_TRANSACTION = 4; + public final static int ROW = 5; + public final static int FIELDS = 6; final static int DELTA_BUFFER_SIZE = 16 * 1024; final static long DELTA_STRIPE_SIZE = 16 * 1024 * 1024; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 37aaeb6dde..ddce6ecc9b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@ -26,8 +26,11 @@ import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.AcidInputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.ColumnarSplit; import org.apache.hadoop.hive.ql.io.LlapAwareSplit; import org.apache.hadoop.hive.ql.io.SyntheticFileId; @@ -225,8 +228,33 @@ public long getColumnarProjectionSize() { } @Override - public boolean canUseLlapIo() { - return isOriginal && (deltas == null || deltas.isEmpty()); + public boolean canUseLlapIo(Configuration conf) { + final boolean hasDelta = deltas != null && !deltas.isEmpty(); + final boolean isAcidRead = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); + final AcidUtils.AcidOperationalProperties acidOperationalProperties + = AcidUtils.getAcidOperationalProperties(conf); + final boolean isSplitUpdate = acidOperationalProperties.isSplitUpdate(); + + if (isOriginal) { + if (!isAcidRead && !hasDelta) { + // Original scan only + return true; + } + } else { + if (hasBase) { + if (hasDelta) { + if (isSplitUpdate) { + // Base with delete deltas + return true; + } + } else { + // Base scan only + return true; + } + } + } + + return false; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 138e56eb0c..4d3ec79d0c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -62,20 +63,77 @@ private static final Logger LOG = LoggerFactory.getLogger(VectorizedOrcAcidRowBatchReader.class); - private org.apache.hadoop.hive.ql.io.orc.RecordReader baseReader; - private VectorizedRowBatchCtx rbCtx; - private VectorizedRowBatch vectorizedRowBatchBase; + public org.apache.hadoop.mapred.RecordReader baseReader; + protected VectorizedRowBatchCtx rbCtx; + protected VectorizedRowBatch vectorizedRowBatchBase; private long offset; private long length; - private float progress = 0.0f; - private Object[] partitionValues; - private boolean addPartitionCols = true; + protected float progress = 0.0f; + protected Object[] partitionValues; + protected boolean addPartitionCols = true; private ValidTxnList validTxnList; - private DeleteEventRegistry deleteEventRegistry; + protected DeleteEventRegistry deleteEventRegistry; + protected StructColumnVector recordIdColumnVector; + private org.apache.orc.Reader.Options readerOptions; public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { + this.init(inputSplit, conf, reporter, Utilities.getVectorizedRowBatchCtx(conf)); + final Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, (OrcSplit) inputSplit); + // Careful with the range here now, we do not want to read the whole base file like deltas. + final RecordReader innerReader = reader.rowsOptions(readerOptions.range(offset, length)); + baseReader = new org.apache.hadoop.mapred.RecordReader() { + + @Override + public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { + return innerReader.nextBatch(value); + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + return rbCtx.createVectorizedRowBatch(); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + innerReader.close(); + } + + @Override + public float getProgress() throws IOException { + return innerReader.getProgress(); + } + }; + this.vectorizedRowBatchBase = ((RecordReaderImpl) innerReader).createRowBatch(); + } + + public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, Reporter reporter, + org.apache.hadoop.mapred.RecordReader baseReader, + VectorizedRowBatchCtx rbCtx) throws IOException { + this.init(inputSplit, conf, reporter, rbCtx); + this.baseReader = baseReader; + this.vectorizedRowBatchBase = baseReader.createValue(); + } + + public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, Reporter reporter, + VectorizedRowBatchCtx rbCtx) throws IOException { + this.init(inputSplit, conf, reporter, rbCtx); + } + + private void init(InputSplit inputSplit, JobConf conf, Reporter reporter, + VectorizedRowBatchCtx rowBatchCtx) throws IOException { + this.rbCtx = rowBatchCtx; final boolean isAcidRead = HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN); final AcidUtils.AcidOperationalProperties acidOperationalProperties = AcidUtils.getAcidOperationalProperties(conf); @@ -89,28 +147,13 @@ public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, } final OrcSplit orcSplit = (OrcSplit) inputSplit; - rbCtx = Utilities.getVectorizedRowBatchCtx(conf); - reporter.setStatus(orcSplit.toString()); - Reader reader = OrcInputFormat.createOrcReaderForSplit(conf, orcSplit); - Reader.Options readerOptions = OrcInputFormat.createOptionsForReader(conf); + readerOptions = OrcInputFormat.createOptionsForReader(conf); readerOptions = OrcRawRecordMerger.createEventOptions(readerOptions); this.offset = orcSplit.getStart(); this.length = orcSplit.getLength(); - // Careful with the range here now, we do not want to read the whole base file like deltas. - this.baseReader = reader.rowsOptions(readerOptions.range(offset, length)); - - // VectorizedRowBatchBase schema is picked up from the baseReader because the SchemaEvolution - // stuff happens at the ORC layer that understands how to map user schema to acid schema. - if (this.baseReader instanceof RecordReaderImpl) { - this.vectorizedRowBatchBase = ((RecordReaderImpl) this.baseReader).createRowBatch(); - } else { - throw new IOException("Failed to create vectorized row batch for the reader of type " - + this.baseReader.getClass().getName()); - } - int partitionColumnCount = (rbCtx != null) ? rbCtx.getPartitionColumnCount() : 0; if (partitionColumnCount > 0) { partitionValues = new Object[partitionColumnCount]; @@ -137,6 +180,8 @@ public VectorizedOrcAcidRowBatchReader(InputSplit inputSplit, JobConf conf, // delete event on-demand. Caps the memory consumption to (some_const * no. of readers). this.deleteEventRegistry = new SortMergedDeleteEventRegistry(conf, orcSplit, deleteEventReaderOptions); } + + recordIdColumnVector = new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, null, null, null); } /** @@ -190,7 +235,7 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti } addPartitionCols = false; } - if (!baseReader.nextBatch(vectorizedRowBatchBase)) { + if (!baseReader.next(null, vectorizedRowBatchBase)) { return false; } } catch (Exception e) { @@ -222,7 +267,8 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti findRecordsWithInvalidTransactionIds(vectorizedRowBatchBase, selectedBitSet); // Case 2- find rows which have been deleted. - this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase, selectedBitSet); + this.deleteEventRegistry.findDeletedRecords(vectorizedRowBatchBase.cols, + vectorizedRowBatchBase.size, selectedBitSet); if (selectedBitSet.cardinality() == vectorizedRowBatchBase.size) { // None of the cases above matched and everything is selected. Hence, we will use the @@ -251,23 +297,33 @@ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOExcepti StructColumnVector payloadStruct = (StructColumnVector) vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW]; // Transfer columnVector objects from base batch to outgoing batch. System.arraycopy(payloadStruct.fields, 0, value.cols, 0, value.getDataColumnCount()); + if (rbCtx != null) { + recordIdColumnVector.fields[0] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]; + recordIdColumnVector.fields[1] = vectorizedRowBatchBase.cols[OrcRecordUpdater.BUCKET]; + recordIdColumnVector.fields[2] = vectorizedRowBatchBase.cols[OrcRecordUpdater.ROW_ID]; + rbCtx.setRecordIdColumnVector(recordIdColumnVector); + } progress = baseReader.getProgress(); return true; } - private void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { - if (batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { + protected void findRecordsWithInvalidTransactionIds(VectorizedRowBatch batch, BitSet selectedBitSet) { + findRecordsWithInvalidTransactionIds(batch.cols, batch.size, selectedBitSet); + } + + protected void findRecordsWithInvalidTransactionIds(ColumnVector[] cols, int size, BitSet selectedBitSet) { + if (cols[OrcRecordUpdater.CURRENT_TRANSACTION].isRepeating) { // When we have repeating values, we can unset the whole bitset at once // if the repeating value is not a valid transaction. long currentTransactionIdForBatch = ((LongColumnVector) - batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0]; + cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector[0]; if (!validTxnList.isTxnValid(currentTransactionIdForBatch)) { - selectedBitSet.clear(0, batch.size); + selectedBitSet.clear(0, size); } return; } long[] currentTransactionVector = - ((LongColumnVector) batch.cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector; + ((LongColumnVector) cols[OrcRecordUpdater.CURRENT_TRANSACTION]).vector; // Loop through the bits that are set to true and mark those rows as false, if their // current transactions are not valid. for (int setBitIndex = selectedBitSet.nextSetBit(0); @@ -319,15 +375,16 @@ DeleteEventRegistry getDeleteEventRegistry() { * will read the delete delta files and will create their own internal * data structures to maintain record ids of the records that got deleted. */ - static interface DeleteEventRegistry { + protected static interface DeleteEventRegistry { /** * Modifies the passed bitset to indicate which of the rows in the batch * have been deleted. Assumes that the batch.size is equal to bitset size. - * @param batch + * @param cols + * @param size * @param selectedBitSet * @throws IOException */ - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) throws IOException; + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException; /** * The close() method can be called externally to signal the implementing classes @@ -376,29 +433,29 @@ public SortMergedDeleteEventRegistry(JobConf conf, OrcSplit orcSplit, Reader.Opt } @Override - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { if (!isDeleteRecordAvailable) { return; } long[] originalTransaction = - batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; long[] bucket = - batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector; + cols[OrcRecordUpdater.BUCKET].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector; long[] rowId = - batch.cols[OrcRecordUpdater.ROW_ID].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; + cols[OrcRecordUpdater.ROW_ID].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector; // The following repeatedX values will be set, if any of the columns are repeating. long repeatedOriginalTransaction = (originalTransaction != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; long repeatedBucket = (bucket != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector[0]; long repeatedRowId = (rowId != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector[0]; // Get the first valid row in the batch still available. @@ -413,7 +470,7 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) rowId != null ? (int) rowId[firstValidIndex] : repeatedRowId); // Get the last valid row in the batch still available. - int lastValidIndex = selectedBitSet.previousSetBit(batch.size - 1); + int lastValidIndex = selectedBitSet.previousSetBit(size - 1); RecordIdentifier lastRecordIdInBatch = new RecordIdentifier( originalTransaction != null ? originalTransaction[lastValidIndex] : repeatedOriginalTransaction, @@ -860,7 +917,7 @@ private boolean isDeleted(long otid, int bucketProperty, long rowId) { } @Override - public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) + public void findDeletedRecords(ColumnVector[] cols, int size, BitSet selectedBitSet) throws IOException { if (rowIds == null || compressedOtids == null) { return; @@ -869,19 +926,19 @@ public void findDeletedRecords(VectorizedRowBatch batch, BitSet selectedBitSet) // check if it is deleted or not. long[] originalTransactionVector = - batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; + cols[OrcRecordUpdater.ORIGINAL_TRANSACTION].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector; long repeatedOriginalTransaction = (originalTransactionVector != null) ? -1 - : ((LongColumnVector) batch.cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; + : ((LongColumnVector) cols[OrcRecordUpdater.ORIGINAL_TRANSACTION]).vector[0]; long[] bucketProperties = - batch.cols[OrcRecordUpdater.BUCKET].isRepeating ? null - : ((LongColumnVector)batch.cols[OrcRecordUpdater.BUCKET]).vector; + cols[OrcRecordUpdater.BUCKET].isRepeating ? null + : ((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector; int repeatedBucketProperty = (bucketProperties != null) ? -1 - : (int)((LongColumnVector) batch.cols[OrcRecordUpdater.BUCKET]).vector[0]; + : (int)((LongColumnVector) cols[OrcRecordUpdater.BUCKET]).vector[0]; long[] rowIdVector = - ((LongColumnVector) batch.cols[OrcRecordUpdater.ROW_ID]).vector; + ((LongColumnVector) cols[OrcRecordUpdater.ROW_ID]).vector; for (int setBitIndex = selectedBitSet.nextSetBit(0); setBitIndex >= 0; diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java index a2725b20e9..885ef83381 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.io.orc; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; @@ -39,7 +41,7 @@ * the non-vectorized ACID reader and moving the data into a vectorized row * batch. */ -class VectorizedOrcAcidRowReader +public class VectorizedOrcAcidRowReader implements org.apache.hadoop.mapred.RecordReader { private final AcidInputFormat.RowReader innerReader; @@ -49,11 +51,14 @@ private Object[] partitionValues; private final ObjectInspector objectInspector; private final DataOutputBuffer buffer = new DataOutputBuffer(); + private final StructColumnVector recordIdColumnVector; + private final LongColumnVector transactionColumnVector; + private final LongColumnVector bucketColumnVector; + private final LongColumnVector rowIdColumnVector; - VectorizedOrcAcidRowReader(AcidInputFormat.RowReader inner, - Configuration conf, - VectorizedRowBatchCtx vectorizedRowBatchCtx, - FileSplit split) throws IOException { + public VectorizedOrcAcidRowReader(AcidInputFormat.RowReader inner, + Configuration conf, VectorizedRowBatchCtx vectorizedRowBatchCtx, FileSplit split) + throws IOException { this.innerReader = inner; this.key = inner.createKey(); rbCtx = vectorizedRowBatchCtx; @@ -64,6 +69,12 @@ } this.value = inner.createValue(); this.objectInspector = inner.getObjectInspector(); + this.transactionColumnVector = new LongColumnVector(); + this.bucketColumnVector = new LongColumnVector(); + this.rowIdColumnVector = new LongColumnVector(); + this.recordIdColumnVector = + new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, + transactionColumnVector, bucketColumnVector, rowIdColumnVector); } @Override @@ -81,19 +92,30 @@ public boolean next(NullWritable nullWritable, try { VectorizedBatchUtil.acidAddRowToBatch(value, (StructObjectInspector) objectInspector, - vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer); + vectorizedRowBatch.size, vectorizedRowBatch, rbCtx, buffer); + addRecordId(vectorizedRowBatch.size, key); + vectorizedRowBatch.size++; while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length && innerReader.next(key, value)) { VectorizedBatchUtil.acidAddRowToBatch(value, (StructObjectInspector) objectInspector, - vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer); + vectorizedRowBatch.size, vectorizedRowBatch, rbCtx, buffer); + addRecordId(vectorizedRowBatch.size, key); + vectorizedRowBatch.size++; } + rbCtx.setRecordIdColumnVector(recordIdColumnVector); } catch (Exception e) { throw new IOException("error iterating", e); } return true; } + private void addRecordId(int index, RecordIdentifier key) { + transactionColumnVector.vector[index] = key.getTransactionId(); + bucketColumnVector.vector[index] = key.getBucketProperty(); + rowIdColumnVector.vector[index] = key.getRowId(); + } + @Override public NullWritable createKey() { return NullWritable.get(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java index c21327f517..c8a0415920 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java @@ -2149,7 +2149,11 @@ private static TreeReader createEncodedTreeReader(TypeDescription schema, } else if (batch.hasVectors(columnIndex)) { vectors = batch.getColumnVectors(columnIndex); } else { - throw new AssertionError("Batch has no data for " + columnIndex + ": " + batch); + // A struct column can have a null child column + if (LOG.isDebugEnabled()) { + LOG.debug("Batch has no data for " + columnIndex + ": " + batch); + } + return null; } // EncodedColumnBatch is already decompressed, we don't really need to pass codec. @@ -2712,7 +2716,9 @@ public void seek(PositionProvider[] index) throws IOException { } if (fields != null) { for (TreeReader child : fields) { - child.seek(index); + if (child != null) { + child.seek(index); + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 702be2e16b..8a4f5c87de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -69,6 +71,8 @@ public void analyzeInternal(ASTNode tree) throws SemanticException { if (useSuper) { super.analyzeInternal(tree); } else { + // TODO: remove when this is enabled everywhere + HiveConf.setBoolVar(conf, ConfVars.HIVE_VECTORIZATION_ROW_IDENTIFIER_ENABLED, true); if (!SessionState.get().getTxnMgr().supportsAcid()) { throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TXNMGR.getMsg()); diff --git ql/src/test/queries/clientpositive/llap_acid_fast.q ql/src/test/queries/clientpositive/llap_acid_fast.q new file mode 100644 index 0000000000..376b19ced1 --- /dev/null +++ ql/src/test/queries/clientpositive/llap_acid_fast.q @@ -0,0 +1,49 @@ +set hive.mapred.mode=nonstrict; +SET hive.vectorized.execution.enabled=true; + +SET hive.llap.io.enabled=true; + +SET hive.exec.orc.default.buffer.size=32768; +SET hive.exec.orc.default.row.index.stride=1000; +SET hive.optimize.index.filter=true; +set hive.fetch.task.conversion=none; + +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +DROP TABLE orc_llap_acid_fast; + +CREATE TABLE orc_llap_acid_fast ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default'); + +insert into table orc_llap_acid_fast partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10; +insert into table orc_llap_acid_fast partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10; +insert into table orc_llap_acid_fast partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10; + +explain vectorization only detail +select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint; +select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint; + +insert into table orc_llap_acid_fast partition (csmallint = 1) values (1, 1, 1, 1); + +update orc_llap_acid_fast set cbigint = 2 where cint = 1; + +explain vectorization only detail +select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint; +select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint; + +DROP TABLE orc_llap_acid_fast; diff --git ql/src/test/results/clientpositive/llap/llap_acid.q.out ql/src/test/results/clientpositive/llap/llap_acid.q.out new file mode 100644 index 0000000000..ff89d1d58e --- /dev/null +++ ql/src/test/results/clientpositive/llap/llap_acid.q.out @@ -0,0 +1,321 @@ +PREHOOK: query: DROP TABLE orc_llap +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE orc_llap +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE orc_llap ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_llap +POSTHOOK: query: CREATE TABLE orc_llap ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_llap +PREHOOK: query: insert into table orc_llap partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: query: insert into table orc_llap partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: insert into table orc_llap partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap@csmallint=2 +POSTHOOK: query: insert into table orc_llap partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap@csmallint=2 +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=2).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true') +PREHOOK: type: ALTERTABLE_PROPERTIES +PREHOOK: Input: default@orc_llap +PREHOOK: Output: default@orc_llap +POSTHOOK: query: alter table orc_llap SET TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: ALTERTABLE_PROPERTIES +POSTHOOK: Input: default@orc_llap +POSTHOOK: Output: default@orc_llap +PREHOOK: query: insert into table orc_llap partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap@csmallint=3 +POSTHOOK: query: insert into table orc_llap partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap@csmallint=3 +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=3).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=3).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=3).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=3).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: explain +select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain +select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: orc_llap + filterExpr: cint is not null (type: boolean) + Statistics: Num rows: 20 Data size: 616 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 19 Data size: 304 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 19 Data size: 304 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col1 (type: smallint), _col0 (type: int) + sort order: ++ + Statistics: Num rows: 19 Data size: 304 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col2 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 19 Data size: 304 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 19 Data size: 304 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +PREHOOK: Input: default@orc_llap@csmallint=1 +PREHOOK: Input: default@orc_llap@csmallint=2 +PREHOOK: Input: default@orc_llap@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +POSTHOOK: Input: default@orc_llap@csmallint=1 +POSTHOOK: Input: default@orc_llap@csmallint=2 +POSTHOOK: Input: default@orc_llap@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: insert into table orc_llap partition (csmallint = 1) values (1, 1, 1, 1) +PREHOOK: type: QUERY +PREHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: query: insert into table orc_llap partition (csmallint = 1) values (1, 1, 1, 1) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cbigint EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cdouble EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col4, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cfloat EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap PARTITION(csmallint=1).cint EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: update orc_llap set cbigint = 2 where cint = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +PREHOOK: Input: default@orc_llap@csmallint=1 +PREHOOK: Input: default@orc_llap@csmallint=2 +PREHOOK: Input: default@orc_llap@csmallint=3 +PREHOOK: Output: default@orc_llap@csmallint=1 +PREHOOK: Output: default@orc_llap@csmallint=2 +PREHOOK: Output: default@orc_llap@csmallint=3 +POSTHOOK: query: update orc_llap set cbigint = 2 where cint = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +POSTHOOK: Input: default@orc_llap@csmallint=1 +POSTHOOK: Input: default@orc_llap@csmallint=2 +POSTHOOK: Input: default@orc_llap@csmallint=3 +POSTHOOK: Output: default@orc_llap@csmallint=1 +POSTHOOK: Output: default@orc_llap@csmallint=2 +POSTHOOK: Output: default@orc_llap@csmallint=3 +PREHOOK: query: explain +select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain +select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: orc_llap + filterExpr: cint is not null (type: boolean) + Statistics: Num rows: 20 Data size: 616 Basic stats: COMPLETE Column stats: PARTIAL + Filter Operator + predicate: cint is not null (type: boolean) + Statistics: Num rows: 19 Data size: 304 Basic stats: COMPLETE Column stats: PARTIAL + Select Operator + expressions: cint (type: int), csmallint (type: smallint), cbigint (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 19 Data size: 304 Basic stats: COMPLETE Column stats: PARTIAL + Reduce Output Operator + key expressions: _col1 (type: smallint), _col0 (type: int) + sort order: ++ + Statistics: Num rows: 19 Data size: 304 Basic stats: COMPLETE Column stats: PARTIAL + value expressions: _col2 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey1 (type: int), KEY.reducesinkkey0 (type: smallint), VALUE._col0 (type: bigint) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 19 Data size: 304 Basic stats: COMPLETE Column stats: PARTIAL + File Output Operator + compressed: false + Statistics: Num rows: 19 Data size: 304 Basic stats: COMPLETE Column stats: PARTIAL + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap +PREHOOK: Input: default@orc_llap@csmallint=1 +PREHOOK: Input: default@orc_llap@csmallint=2 +PREHOOK: Input: default@orc_llap@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap +POSTHOOK: Input: default@orc_llap@csmallint=1 +POSTHOOK: Input: default@orc_llap@csmallint=2 +POSTHOOK: Input: default@orc_llap@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +1 1 2 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: DROP TABLE orc_llap +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@orc_llap +PREHOOK: Output: default@orc_llap +POSTHOOK: query: DROP TABLE orc_llap +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@orc_llap +POSTHOOK: Output: default@orc_llap diff --git ql/src/test/results/clientpositive/llap/llap_acid_fast.q.out ql/src/test/results/clientpositive/llap/llap_acid_fast.q.out new file mode 100644 index 0000000000..f00a690df4 --- /dev/null +++ ql/src/test/results/clientpositive/llap/llap_acid_fast.q.out @@ -0,0 +1,361 @@ +PREHOOK: query: DROP TABLE orc_llap_acid_fast +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE orc_llap_acid_fast +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE orc_llap_acid_fast ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_llap_acid_fast +POSTHOOK: query: CREATE TABLE orc_llap_acid_fast ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_llap_acid_fast +PREHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=2 +POSTHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=2 +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=2).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=2).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=2).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=2).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=3 +POSTHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=3 +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=3).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=3).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=3).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=3).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: explain vectorization only detail +select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain vectorization only detail +select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +PLAN VECTORIZATION: + enabled: true + enabledConditionsMet: [hive.vectorized.execution.enabled IS true] + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Vertices: + Map 1 + Map Operator Tree: + TableScan Vectorization: + native: true + projectedOutputColumns: [0, 1, 2, 3, 4] + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: SelectColumnIsNotNull(col 0) -> boolean + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumns: [0, 4, 1] + Reduce Sink Vectorization: + className: VectorReduceSinkObjectHashOperator + keyColumns: [4, 0] + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + valueColumns: [1] + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + groupByVectorOutput: true + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: true + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 4 + includeColumns: [0, 1] + dataColumns: cint:int, cbigint:bigint, cfloat:float, cdouble:double + partitionColumnCount: 1 + partitionColumns: csmallint:smallint + Reducer 2 + Execution mode: vectorized, llap + Reduce Vectorization: + enabled: true + enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true + reduceColumnNullOrder: aa + reduceColumnSortOrder: ++ + groupByVectorOutput: true + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 3 + dataColumns: KEY.reducesinkkey0:smallint, KEY.reducesinkkey1:int, VALUE._col0:bigint + partitionColumnCount: 0 + Reduce Operator Tree: + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumns: [1, 0, 2] + File Sink Vectorization: + className: VectorFileSinkOperator + native: false + + Stage: Stage-0 + Fetch Operator + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_acid_fast +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=1 +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=2 +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_acid_fast +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=2 +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 1) values (1, 1, 1, 1) +PREHOOK: type: QUERY +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 1) values (1, 1, 1, 1) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cbigint EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cdouble EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col4, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cfloat EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cint EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: update orc_llap_acid_fast set cbigint = 2 where cint = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_acid_fast +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=1 +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=2 +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=3 +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=1 +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=2 +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=3 +POSTHOOK: query: update orc_llap_acid_fast set cbigint = 2 where cint = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_acid_fast +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=2 +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=3 +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=2 +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=3 +PREHOOK: query: explain vectorization only detail +select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain vectorization only detail +select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +PLAN VECTORIZATION: + enabled: true + enabledConditionsMet: [hive.vectorized.execution.enabled IS true] + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) + Vertices: + Map 1 + Map Operator Tree: + TableScan Vectorization: + native: true + projectedOutputColumns: [0, 1, 2, 3, 4] + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: SelectColumnIsNotNull(col 0) -> boolean + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumns: [0, 4, 1] + Reduce Sink Vectorization: + className: VectorReduceSinkObjectHashOperator + keyColumns: [4, 0] + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + valueColumns: [1] + Execution mode: vectorized, llap + LLAP IO: may be used (ACID table) + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + groupByVectorOutput: true + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: true + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 4 + includeColumns: [0, 1] + dataColumns: cint:int, cbigint:bigint, cfloat:float, cdouble:double + partitionColumnCount: 1 + partitionColumns: csmallint:smallint + Reducer 2 + Execution mode: vectorized, llap + Reduce Vectorization: + enabled: true + enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true + reduceColumnNullOrder: aa + reduceColumnSortOrder: ++ + groupByVectorOutput: true + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 3 + dataColumns: KEY.reducesinkkey0:smallint, KEY.reducesinkkey1:int, VALUE._col0:bigint + partitionColumnCount: 0 + Reduce Operator Tree: + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumns: [1, 0, 2] + File Sink Vectorization: + className: VectorFileSinkOperator + native: false + + Stage: Stage-0 + Fetch Operator + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_acid_fast +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=1 +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=2 +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_acid_fast +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=2 +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +1 1 2 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: DROP TABLE orc_llap_acid_fast +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@orc_llap_acid_fast +PREHOOK: Output: default@orc_llap_acid_fast +POSTHOOK: query: DROP TABLE orc_llap_acid_fast +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@orc_llap_acid_fast +POSTHOOK: Output: default@orc_llap_acid_fast diff --git ql/src/test/results/clientpositive/llap_acid_fast.q.out ql/src/test/results/clientpositive/llap_acid_fast.q.out new file mode 100644 index 0000000000..853d556231 --- /dev/null +++ ql/src/test/results/clientpositive/llap_acid_fast.q.out @@ -0,0 +1,315 @@ +PREHOOK: query: DROP TABLE orc_llap_acid_fast +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE orc_llap_acid_fast +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE orc_llap_acid_fast ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@orc_llap_acid_fast +POSTHOOK: query: CREATE TABLE orc_llap_acid_fast ( + cint INT, + cbigint BIGINT, + cfloat FLOAT, + cdouble DOUBLE) +partitioned by (csmallint smallint) +clustered by (cint) into 2 buckets stored as orc +TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@orc_llap_acid_fast +PREHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 1) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=2 +POSTHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 2) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble asc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=2 +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=2).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=2).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=2).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=2).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=3 +POSTHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 3) +select cint, cbigint, cfloat, cdouble from alltypesorc order by cdouble desc limit 10 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=3 +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=3).cbigint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cbigint, type:bigint, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=3).cdouble SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cdouble, type:double, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=3).cfloat SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cfloat, type:float, comment:null), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=3).cint SIMPLE [(alltypesorc)alltypesorc.FieldSchema(name:cint, type:int, comment:null), ] +PREHOOK: query: explain vectorization only detail +select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain vectorization only detail +select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +PLAN VECTORIZATION: + enabled: true + enabledConditionsMet: [hive.vectorized.execution.enabled IS true] + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan Vectorization: + native: true + projectedOutputColumns: [0, 1, 2, 3, 4] + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: SelectColumnIsNotNull(col 0) -> boolean + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumns: [0, 4, 1] + Reduce Sink Vectorization: + className: VectorReduceSinkOperator + native: false + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + nativeConditionsNotMet: hive.execution.engine mr IN [tez, spark] IS false + Execution mode: vectorized + LLAP IO: may be used (ACID table) + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + groupByVectorOutput: true + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 4 + includeColumns: [0, 1] + dataColumns: cint:int, cbigint:bigint, cfloat:float, cdouble:double + partitionColumnCount: 1 + partitionColumns: csmallint:smallint + Reduce Vectorization: + enabled: false + enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true + enableConditionsNotMet: hive.execution.engine mr IN [tez, spark] IS false + Reduce Operator Tree: + + Stage: Stage-0 + Fetch Operator + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_acid_fast +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=1 +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=2 +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_acid_fast +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=2 +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 1) values (1, 1, 1, 1) +PREHOOK: type: QUERY +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: query: insert into table orc_llap_acid_fast partition (csmallint = 1) values (1, 1, 1, 1) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cbigint EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cdouble EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col4, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cfloat EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col3, type:string, comment:), ] +POSTHOOK: Lineage: orc_llap_acid_fast PARTITION(csmallint=1).cint EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: update orc_llap_acid_fast set cbigint = 2 where cint = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_acid_fast +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=1 +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=2 +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=3 +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=1 +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=2 +PREHOOK: Output: default@orc_llap_acid_fast@csmallint=3 +POSTHOOK: query: update orc_llap_acid_fast set cbigint = 2 where cint = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_acid_fast +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=2 +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=3 +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=2 +POSTHOOK: Output: default@orc_llap_acid_fast@csmallint=3 +PREHOOK: query: explain vectorization only detail +select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +POSTHOOK: query: explain vectorization only detail +select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +PLAN VECTORIZATION: + enabled: true + enabledConditionsMet: [hive.vectorized.execution.enabled IS true] + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan Vectorization: + native: true + projectedOutputColumns: [0, 1, 2, 3, 4] + Filter Vectorization: + className: VectorFilterOperator + native: true + predicateExpression: SelectColumnIsNotNull(col 0) -> boolean + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumns: [0, 4, 1] + Reduce Sink Vectorization: + className: VectorReduceSinkOperator + native: false + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + nativeConditionsNotMet: hive.execution.engine mr IN [tez, spark] IS false + Execution mode: vectorized + LLAP IO: may be used (ACID table) + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + groupByVectorOutput: true + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 4 + includeColumns: [0, 1] + dataColumns: cint:int, cbigint:bigint, cfloat:float, cdouble:double + partitionColumnCount: 1 + partitionColumns: csmallint:smallint + Reduce Vectorization: + enabled: false + enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true + enableConditionsNotMet: hive.execution.engine mr IN [tez, spark] IS false + Reduce Operator Tree: + + Stage: Stage-0 + Fetch Operator + +PREHOOK: query: select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_llap_acid_fast +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=1 +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=2 +PREHOOK: Input: default@orc_llap_acid_fast@csmallint=3 +#### A masked pattern was here #### +POSTHOOK: query: select cint, csmallint, cbigint from orc_llap_acid_fast where cint is not null order +by csmallint, cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_llap_acid_fast +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=1 +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=2 +POSTHOOK: Input: default@orc_llap_acid_fast@csmallint=3 +#### A masked pattern was here #### +-285355633 1 -1241163445 +-109813638 1 -58941842 +1 1 2 +164554497 1 1161977292 +199879534 1 123351087 +246423894 1 -1645852809 +354670578 1 562841852 +455419170 1 1108177470 +665801232 1 480783141 +708885482 1 -1645852809 +-285355633 2 -1241163445 +-109813638 2 -58941842 +164554497 2 1161977292 +199879534 2 123351087 +246423894 2 -1645852809 +354670578 2 562841852 +455419170 2 1108177470 +665801232 2 480783141 +708885482 2 -1645852809 +-923308739 3 -1887561756 +-3728 3 -1887561756 +762 3 -1645852809 +6981 3 -1887561756 +253665376 3 NULL +497728223 3 -1887561756 +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +528534767 3 NULL +PREHOOK: query: DROP TABLE orc_llap_acid_fast +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@orc_llap_acid_fast +PREHOOK: Output: default@orc_llap_acid_fast +POSTHOOK: query: DROP TABLE orc_llap_acid_fast +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@orc_llap_acid_fast +POSTHOOK: Output: default@orc_llap_acid_fast