diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 1fd09ebe63..5069130b21 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -168,6 +168,7 @@ minillap.query.files=\ union_script.q,\ vector_custom_udf_configure.q,\ vector_offset_limit.q,\ + vector_ptf_window_unbounded.q,\ vector_udf3.q,\ whroot_external1.q diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 9bb59bc955..7af76b9188 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -210,6 +210,25 @@ public static ColumnVector createColumnVector(TypeInfo typeInfo, } } + public static ColumnVector createColumnVector(ColumnVector.Type type, int size, int precision, int scale) { + switch (type) { + case LONG: + return new LongColumnVector(size); + case DOUBLE: + return new DoubleColumnVector(size); + case BYTES: + return new BytesColumnVector(size); + case DECIMAL: + return new DecimalColumnVector(size, precision, scale); + case TIMESTAMP: + return new TimestampColumnVector(size); + case INTERVAL_DAY_TIME: + return new IntervalDayTimeColumnVector(size); + default: + throw new RuntimeException("Unexpected column vector type " + type); + } + } + /** * Iterates thru all the columns in a given row and populates the batch * from a given offset diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFGroupBatches.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFGroupBatches.java index b0340d259d..d8a0932c23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFGroupBatches.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFGroupBatches.java @@ -19,21 +19,18 @@ package org.apache.hadoop.hive.ql.exec.vector.ptf; import java.io.IOException; +import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; +import org.apache.hadoop.hive.ql.exec.vector.*; import org.apache.hadoop.hive.ql.exec.vector.rowbytescontainer.VectorRowBytesContainer; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead; import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -49,6 +46,7 @@ private VectorPTFEvaluatorBase[] evaluators; private int[] outputProjectionColumnMap; + private TypeInfo[] outputTypeInfos; private int[] keyInputColumnMap; private int bufferedColumnCount; @@ -92,10 +90,11 @@ public void init( TypeInfo[] reducerBatchTypeInfos, VectorPTFEvaluatorBase[] evaluators, int[] outputProjectionColumnMap, TypeInfo[] outputTypeInfos, - int[] keyInputColumnMap, int[] nonKeyInputColumnMap, int[] streamingEvaluatorNums, + int[] keyInputColumnMap, int[] orderColumnMap, int[] nonKeyInputColumnMap, int[] streamingEvaluatorNums, VectorizedRowBatch overflowBatch) { this.evaluators = evaluators; this.outputProjectionColumnMap = outputProjectionColumnMap; + this.outputTypeInfos = outputTypeInfos; this.keyInputColumnMap = keyInputColumnMap; /* @@ -105,16 +104,23 @@ public void init( * We buffer the non-key input columns. And, we buffer any streaming columns that will already * have their output values. */ + final int orderColumnCount = orderColumnMap.length; final int nonKeyInputColumnCount = nonKeyInputColumnMap.length; final int streamingEvaluatorCount = streamingEvaluatorNums.length; - bufferedColumnCount = nonKeyInputColumnCount + streamingEvaluatorCount; + bufferedColumnCount = orderColumnCount + nonKeyInputColumnCount + streamingEvaluatorCount; bufferedColumnMap = new int[bufferedColumnCount]; bufferedTypeInfos = new TypeInfo[bufferedColumnCount]; - for (int i = 0; i < nonKeyInputColumnCount; i++) { - final int columnNum = nonKeyInputColumnMap[i]; + for (int i = 0; i < orderColumnCount; i++) { + final int columnNum = orderColumnMap[i]; bufferedColumnMap[i] = columnNum; bufferedTypeInfos[i] = reducerBatchTypeInfos[columnNum]; } + for (int i = 0; i < nonKeyInputColumnCount; i++) { + final int columnNum = nonKeyInputColumnMap[i]; + final int bufferedMapIndex = orderColumnCount + i; + bufferedColumnMap[bufferedMapIndex] = columnNum; + bufferedTypeInfos[bufferedMapIndex] = reducerBatchTypeInfos[columnNum]; + } for (int i = 0; i < streamingEvaluatorCount; i++) { final int streamingEvaluatorNum = streamingEvaluatorNums[i]; @@ -303,6 +309,91 @@ private void copyPartitionAndOrderColumnsToOverflow(VectorizedRowBatch lastBatch } } + public void createColumnVectorsForBatch(VectorPTFOperator vecPTFOperator, + VectorizedRowBatch batch) throws HiveException { + int count = vecPTFOperator.types.length; + batch.cols = new ColumnVector[count]; + for (int i = 0; i < count; i++) { + batch.cols[i] = VectorizedBatchUtil.createColumnVector(vecPTFOperator.types[i], VectorizedRowBatch.DEFAULT_SIZE, + (vecPTFOperator.decimalPrecisions != null && vecPTFOperator.decimalPrecisions.length > i) + ? vecPTFOperator.decimalPrecisions[i] : 0, + (vecPTFOperator.decimalScales != null && vecPTFOperator.decimalScales.length > i) + ? vecPTFOperator.decimalScales[i] : 0); + } + } + + public void setPartitionColumnToBufferedBatch(VectorPTFOperator vecPTFOperator, + VectorizedRowBatch batch) throws HiveException { + final int count = vecPTFOperator.partitionColumnMap.length; + for (int i = 0; i < count; i++) { + ColumnVector colVector = batch.cols[vecPTFOperator.partitionColumnMap[i]]; + + // Partition columns are repeated -- so we test element 0. + + if (vecPTFOperator.currentPartitionIsNull[i]) { + colVector.isNull[0] = true; + } else { + colVector.noNulls = true; + colVector.isNull[0] = false; + } + + if (vecPTFOperator.currentPartitionIsNull[i]) { + continue; + } + + switch (vecPTFOperator.partitionColumnVectorTypes[i]) { + case LONG: + ((LongColumnVector) colVector).vector[0] = vecPTFOperator.currentPartitionLongs[i]; + break; + case DOUBLE: + ((DoubleColumnVector) colVector).vector[0] = vecPTFOperator.currentPartitionDoubles[i]; + break; + case BYTES: + { + BytesColumnVector byteColVector = (BytesColumnVector) colVector; + byteColVector.vector[0] = Arrays.copyOfRange(vecPTFOperator.currentPartitionByteArrays[i], 0, + vecPTFOperator.currentPartitionByteLengths[i]); + byteColVector.length[0] = vecPTFOperator.currentPartitionByteLengths[i]; + } + break; + case DECIMAL: + ((DecimalColumnVector) colVector).vector[0].set(vecPTFOperator.currentPartitionDecimals[i]); + break; + case TIMESTAMP: + ((TimestampColumnVector) colVector).set(0, vecPTFOperator.currentPartitionTimestamps[i]); + break; + case INTERVAL_DAY_TIME: + ((IntervalDayTimeColumnVector) colVector).set(0, vecPTFOperator.currentPartitionIntervalDayTimes[i]); + break; + default: + throw new RuntimeException("Unexpected column vector type " + vecPTFOperator.partitionColumnVectorTypes[i]); + } + } + } + + public void forwardBufferedAndSpilledBatches(VectorPTFOperator vecPTFOperator) throws HiveException { + + if (currentBufferedBatchCount == 0) return; + + VectorizedRowBatch newBatch = new VectorizedRowBatch(0); + createColumnVectorsForBatch(vecPTFOperator, newBatch); + setPartitionColumnToBufferedBatch(vecPTFOperator, newBatch); + if (didSpillToDisk) { + forwardSpilledBatches(vecPTFOperator, newBatch); + didSpillToDisk = false; + } + + if (currentBufferedBatchCount > 0) { + overflowBatch.reset(); + createColumnVectorsForBatch(vecPTFOperator, overflowBatch); + setPartitionColumnToBufferedBatch(vecPTFOperator, overflowBatch); + for (int i = 0; i < currentBufferedBatchCount; i++) { + forwardBufferedBatches(vecPTFOperator, i); + } + currentBufferedBatchCount = 0; + } + } + public void fillGroupResultsAndForward(VectorPTFOperator vecPTFOperator, VectorizedRowBatch lastBatch) throws HiveException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFOperator.java index b1d65287d9..2840e5a794 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ptf/VectorPTFOperator.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; @@ -106,8 +107,8 @@ private VectorExpression[] orderExpressions; private ExprNodeDesc[] partitionExprNodeDescs; - private int[] partitionColumnMap; - private Type[] partitionColumnVectorTypes; + public int[] partitionColumnMap; + public Type[] partitionColumnVectorTypes; private VectorExpression[] partitionExpressions; private int[] keyInputColumnMap; @@ -131,14 +132,17 @@ private transient boolean isFirstPartition; - private transient boolean[] currentPartitionIsNull; - private transient long[] currentPartitionLongs; - private transient double[] currentPartitionDoubles; - private transient byte[][] currentPartitionByteArrays; - private transient int[] currentPartitionByteLengths; - private transient HiveDecimalWritable[] currentPartitionDecimals; - private transient Timestamp[] currentPartitionTimestamps; - private transient HiveIntervalDayTime[] currentPartitionIntervalDayTimes; + public transient boolean[] currentPartitionIsNull; + public transient long[] currentPartitionLongs; + public transient double[] currentPartitionDoubles; + public transient byte[][] currentPartitionByteArrays; + public transient int[] currentPartitionByteLengths; + public transient HiveDecimalWritable[] currentPartitionDecimals; + public transient Timestamp[] currentPartitionTimestamps; + public transient HiveIntervalDayTime[] currentPartitionIntervalDayTimes; + public transient int[] decimalPrecisions; + public transient int[] decimalScales; + public transient ColumnVector.Type[] types; // For debug tracing: the name of the map or reduce task. private transient String taskName; @@ -202,6 +206,7 @@ public VectorPTFOperator(CompilationOpContext ctx, OperatorDesc conf, keyInputColumnMap = vectorPTFInfo.getKeyInputColumnMap(); nonKeyInputColumnMap = vectorPTFInfo.getNonKeyInputColumnMap(); + types = null; } /** @@ -289,6 +294,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { currentPartitionDecimals = null; currentPartitionTimestamps = null; currentPartitionIntervalDayTimes = null; + decimalPrecisions = null; + decimalScales = null; } else { final int partitionKeyCount = vectorDesc.getPartitionExprNodeDescs().length; currentPartitionIsNull = new boolean[partitionKeyCount]; @@ -299,6 +306,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { currentPartitionDecimals = new HiveDecimalWritable[partitionKeyCount]; currentPartitionTimestamps = new Timestamp[partitionKeyCount]; currentPartitionIntervalDayTimes = new HiveIntervalDayTime[partitionKeyCount]; + decimalPrecisions = null; + decimalScales = null; } evaluators = VectorPTFDesc.getEvaluators(vectorDesc, vectorPTFInfo); @@ -320,6 +329,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { outputProjectionColumnMap, outputTypeInfos, keyInputColumnMap, + orderColumnMap, nonKeyInputColumnMap, streamingEvaluatorNums, overflowBatch); @@ -362,13 +372,28 @@ public void process(Object row, int tag) throws HiveException { } } - if (isPartitionOrderBy) { + boolean isPartitionKeyChanged = isPartitionChanged(batch); + boolean oldIsFirstPartition = isFirstPartition; + + if (isFirstPartition) { + int numCols = batch.cols.length; + types = new ColumnVector.Type[numCols]; + for (int i = 0; i < numCols; ++i) { + types[i] = batch.cols[i].type; + if (batch.cols[i] instanceof DecimalColumnVector) { + decimalPrecisions[i] = ((DecimalColumnVector)batch.cols[i]).precision; + decimalScales[i] = ((DecimalColumnVector)batch.cols[i]).scale; + } + } + isFirstPartition = false; + } + + if (isPartitionOrderBy) { // Check for PARTITION BY key change when we have ORDER BY keys. - if (isFirstPartition) { - isFirstPartition = false; + if (oldIsFirstPartition) { setCurrentPartition(batch); - } else if (isPartitionChanged(batch)) { + } else if (isPartitionKeyChanged) { setCurrentPartition(batch); groupBatches.resetEvaluators(); } @@ -385,7 +410,7 @@ public void process(Object row, int tag) throws HiveException { // Evaluate the aggregation functions over the group batch. groupBatches.evaluateGroupBatch(batch, isLastGroupBatch); - if (!isLastGroupBatch) { + if (!isLastGroupBatch || (isPartitionOrderBy && (oldIsFirstPartition || !isPartitionKeyChanged))) { // The group spans a VectorizedRowBatch. Swap the relevant columns into our batch buffers, // or write the batch to temporary storage. @@ -410,6 +435,7 @@ public void process(Object row, int tag) throws HiveException { } private boolean isPartitionChanged(VectorizedRowBatch batch) { + if (partitionColumnMap == null) return false; final int count = partitionColumnMap.length; for (int i = 0; i < count; i++) { @@ -541,6 +567,9 @@ public void forward(Object row, ObjectInspector rowInspector) throws HiveExcepti @Override protected void closeOp(boolean abort) throws HiveException { + if (!abort) { + groupBatches.forwardBufferedAndSpilledBatches(this); + } super.closeOp(abort); // We do not try to finish and flush an in-progress group because correct values require the diff --git a/ql/src/test/queries/clientpositive/vector_ptf_window_unbounded.q b/ql/src/test/queries/clientpositive/vector_ptf_window_unbounded.q new file mode 100644 index 0000000000..674443c1a1 --- /dev/null +++ b/ql/src/test/queries/clientpositive/vector_ptf_window_unbounded.q @@ -0,0 +1,10 @@ +set hive.query.results.cache.enabled=false; +create temporary table test2(id STRING,name STRING,event_dt date) stored as orc; + +insert into test2 values ('100','A','2019-08-15'), ('100','A','2019-10-12'); + +SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY event_dt asc) last_event_dt FROM test2; +SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY NAME) last_event_dt FROM test2; + +SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY event_dt asc ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) last_event_dt FROM test2; + diff --git a/ql/src/test/results/clientpositive/llap/vector_ptf_window_unbounded.q.out b/ql/src/test/results/clientpositive/llap/vector_ptf_window_unbounded.q.out new file mode 100644 index 0000000000..e45652e68c --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/vector_ptf_window_unbounded.q.out @@ -0,0 +1,49 @@ +PREHOOK: query: create temporary table test2(id STRING,name STRING,event_dt date) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test2 +POSTHOOK: query: create temporary table test2(id STRING,name STRING,event_dt date) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test2 +PREHOOK: query: insert into test2 values ('100','A','2019-08-15'), ('100','A','2019-10-12') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@test2 +POSTHOOK: query: insert into test2 values ('100','A','2019-08-15'), ('100','A','2019-10-12') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@test2 +POSTHOOK: Lineage: test2.event_dt SCRIPT [] +POSTHOOK: Lineage: test2.id SCRIPT [] +POSTHOOK: Lineage: test2.name SCRIPT [] +PREHOOK: query: SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY event_dt asc) last_event_dt FROM test2 +PREHOOK: type: QUERY +PREHOOK: Input: default@test2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY event_dt asc) last_event_dt FROM test2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test2 +#### A masked pattern was here #### +A 2019-08-15 2019-10-12 +A 2019-10-12 2019-10-12 +PREHOOK: query: SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY NAME) last_event_dt FROM test2 +PREHOOK: type: QUERY +PREHOOK: Input: default@test2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY NAME) last_event_dt FROM test2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test2 +#### A masked pattern was here #### +A 2019-08-15 2019-10-12 +A 2019-10-12 2019-10-12 +PREHOOK: query: SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY event_dt asc ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) last_event_dt FROM test2 +PREHOOK: type: QUERY +PREHOOK: Input: default@test2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY event_dt asc ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) last_event_dt FROM test2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test2 +#### A masked pattern was here #### +A 2019-08-15 2019-10-12 +A 2019-10-12 2019-10-12