diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 409fc90..1cc0104 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -741,7 +741,8 @@ minillaplocal.query.files=acid_globallimit.q,\ smb_mapjoin_17.q,\ groupby_resolution.q,\ windowing_windowspec2.q,\ - vectorized_join46.q + vectorized_join46.q,\ + vectorized_multi_output_select.q encrypted.query.files=encryption_join_unencrypted_tbl.q,\ encryption_insert_partition_static.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 3656842..7f646c4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -110,6 +111,11 @@ private boolean useBucketizedHiveInputFormat; + // Data structures specific for vectorized operators. + private int size; + private boolean selectedInUse; + private int[] selected; + // dummy operator (for not increasing seqId) protected Operator(String name, CompilationOpContext cContext) { this(); @@ -122,6 +128,8 @@ protected Operator() { childOperators = new ArrayList>(); parentOperators = new ArrayList>(); abortOp = new AtomicBoolean(false); + // Initializing data structures for vectorization + selected = new int[VectorizedRowBatch.DEFAULT_SIZE]; } public Operator(CompilationOpContext cContext) { @@ -880,6 +888,56 @@ protected long getNextCntr(long cntr) { protected void forward(Object row, ObjectInspector rowInspector) throws HiveException { + forward(row, rowInspector, false); + } + + protected void forward(Object row, ObjectInspector rowInspector, boolean isVectorized) + throws HiveException { + if (isVectorized && getNumChild() > 1) { + vectorForward((VectorizedRowBatch) row, rowInspector); + return; + } + baseForward(row, rowInspector); + } + + private void vectorForward(VectorizedRowBatch vrg, ObjectInspector rowInspector) + throws HiveException { + runTimeNumRows++; + if (getDone()) { + return; + } + + // Data structures to store original values + size = vrg.size; + selectedInUse = vrg.selectedInUse; + if (vrg.selectedInUse) { + System.arraycopy(vrg.selected, 0, selected, 0, size); + } + + int childrenDone = 0; + for (int i = 0; i < childOperatorsArray.length; i++) { + Operator o = childOperatorsArray[i]; + if (o.getDone()) { + childrenDone++; + } else { + o.process(vrg, childOperatorsTag[i]); + // Restore original values + vrg.size = size; + vrg.selectedInUse = selectedInUse; + if (vrg.selectedInUse) { + System.arraycopy(selected, 0, vrg.selected, 0, size); + } + } + } + + // if all children are done, this operator is also done + if (childrenDone != 0 && childrenDone == childOperatorsArray.length) { + setDone(true); + } + } + + private void baseForward(Object row, ObjectInspector rowInspector) + throws HiveException { runTimeNumRows++; if (getDone()) { return; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 17f2efb..6ea8260 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.stats.StatsCollectionContext; import org.apache.hadoop.hive.ql.stats.StatsPublisher; @@ -69,6 +70,8 @@ // insiderView will tell this TableScan is inside a view or not. private transient boolean insideView; + private transient boolean vectorized; + private String defaultPartitionName; /** @@ -109,7 +112,7 @@ public String getSchemaEvolutionColumnsTypes() { @Override public void process(Object row, int tag) throws HiveException { if (rowLimit >= 0) { - if (row instanceof VectorizedRowBatch) { + if (vectorized) { VectorizedRowBatch batch = (VectorizedRowBatch) row; if (currCount >= rowLimit) { setDone(true); @@ -127,7 +130,7 @@ public void process(Object row, int tag) throws HiveException { if (conf != null && conf.isGatherStats()) { gatherStats(row); } - forward(row, inputObjInspectors[tag]); + forward(row, inputObjInspectors[tag], vectorized); } // Change the table partition for collecting stats @@ -258,6 +261,8 @@ protected void initializeOp(Configuration hconf) throws HiveException { defaultPartitionName = HiveConf.getVar(hconf, HiveConf.ConfVars.DEFAULTPARTITIONNAME); currentStat = null; stats = new HashMap(); + + vectorized = conf.isVectorized(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java index 2bf6ac5..2c433f7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java @@ -130,6 +130,7 @@ public void process(Object data, int tag) throws HiveException { throw new HiveException(e); } - forward(data, rowInspector); + forward(data, rowInspector, true); } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java index fd885a9..fdd5aab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java @@ -120,7 +120,7 @@ public void process(Object row, int tag) throws HiveException { // All are selected, do nothing } if (vrg.size > 0) { - forward(vrg, null); + forward(vrg, null, true); } // Restore the original selected vector diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 642dd46..613a31a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -1057,7 +1057,7 @@ private void writeSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow for (int i = 0; i < aggregators.length; ++i) { forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i)); } - forward(forwardCache, outputObjInspector); + forward(forwardCache, outputObjInspector, false); } else { // Output keys and aggregates into the output batch. for (int i = 0; i < outputKeyLength; ++i) { @@ -1097,7 +1097,7 @@ private void writeGroupRow(VectorAggregationBufferRow agg, DataOutputBuffer buff } private void flushOutput() throws HiveException { - forward(outputBatch, null); + forward(outputBatch, null, true); outputBatch.reset(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java index ea00af3..b37dd05 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorLimitOperator.java @@ -75,7 +75,7 @@ public void process(Object row, int tag) throws HiveException { batch.selected[i] = batch.selected[skipSize + i]; } } - forward(row, inputObjInspectors[tag]); + forward(row, inputObjInspectors[tag], true); currCount += batch.size; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java index bcde25f..b2c8684 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java @@ -126,7 +126,7 @@ protected void internalForward(Object row, ObjectInspector outputOI) throws Hive } private void flushOutput() throws HiveException { - forward(outputBatch, null); + forward(outputBatch, null, true); outputBatch.reset(); } @@ -185,4 +185,5 @@ protected void reProcessBigTable(int partitionId) public VectorizationContext getOuputVectorizationContext() { return vOutContext; } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java index f8c4223..0473f14 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java @@ -307,7 +307,7 @@ protected void internalForward(Object row, ObjectInspector outputOI) throws Hive } private void flushOutput() throws HiveException { - forward(outputBatch, null); + forward(outputBatch, null, true); outputBatch.reset(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index 5c490ef..17ccf21 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -115,7 +115,7 @@ public void process(Object row, int tag) throws HiveException { // Just forward the row as is if (conf.isSelStarNoCompute()) { - forward(row, inputObjInspectors[tag]); + forward(row, inputObjInspectors[tag], true); return; } @@ -134,7 +134,7 @@ public void process(Object row, int tag) throws HiveException { int originalProjectionSize = vrg.projectionSize; vrg.projectionSize = projectedOutputColumns.length; vrg.projectedColumns = this.projectedOutputColumns; - forward(vrg, outputObjInspector); + forward(vrg, outputObjInspector, true); // Revert the projected columns back, because vrg will be re-used. vrg.projectionSize = originalProjectionSize; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 1c20d93..bab5ee4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -635,7 +635,7 @@ public void forwardBigTableBatch(VectorizedRowBatch batch) throws HiveException batch.projectionSize = outputProjection.length; batch.projectedColumns = outputProjection; - forward(batch, null); + forward(batch, null, true); // Revert the projected columns back, because batch can be re-used by our parent operators. batch.projectionSize = originalProjectionSize; @@ -647,7 +647,7 @@ public void forwardBigTableBatch(VectorizedRowBatch batch) throws HiveException * Forward the overflow batch and reset the batch. */ protected void forwardOverflow() throws HiveException { - forward(overflowBatch, null); + forward(overflowBatch, null, true); overflowBatch.reset(); maybeCheckInterrupt(); } @@ -664,7 +664,7 @@ private void maybeCheckInterrupt() throws HiveException { * Forward the overflow batch, but do not reset the batch. */ private void forwardOverflowNoReset() throws HiveException { - forward(overflowBatch, null); + forward(overflowBatch, null, true); } /* diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 933e47d..5f442a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -1107,6 +1107,12 @@ private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskCo HashMap nodeOutput = new HashMap(); ogw.startWalking(topNodes, nodeOutput); + for (Node topNode : topNodes) { + if (topNode instanceof TableScanOperator) { + ((TableScanOperator) topNode).getConf().setVectorized(true); + } + } + vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames()); vectorTaskColumnInfo.transferToBaseWork(mapWork); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 570bd6b..d1c8690 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -102,6 +102,8 @@ private boolean isAcidTable; + private boolean vectorized; + private AcidUtils.AcidOperationalProperties acidOperationalProperties = null; private transient TableSample tableSample; @@ -444,4 +446,12 @@ public TableScanOperatorExplainVectorization getTableScanVectorization() { } return new TableScanOperatorExplainVectorization(this, vectorDesc); } + + public void setVectorized(boolean vectorized) { + this.vectorized = vectorized; + } + + public boolean isVectorized() { + return vectorized; + } } diff --git ql/src/test/queries/clientpositive/vectorized_multi_output_select.q ql/src/test/queries/clientpositive/vectorized_multi_output_select.q new file mode 100644 index 0000000..e768a5d --- /dev/null +++ ql/src/test/queries/clientpositive/vectorized_multi_output_select.q @@ -0,0 +1,28 @@ +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask.size=3000; +set hive.strict.checks.cartesian.product=false; +set hive.merge.nway.joins=false; +set hive.vectorized.execution.enabled=true; + +explain +select * from ( + select count(*) as h8_30_to_9 + from src + join src1 on src.key = src1.key + where src1.value = "val_278") s1 +join ( + select count(*) as h9_to_9_30 + from src + join src1 on src.key = src1.key + where src1.value = "val_255") s2; + +select * from ( + select count(*) as h8_30_to_9 + from src + join src1 on src.key = src1.key + where src1.value = "val_278") s1 +join ( + select count(*) as h9_to_9_30 + from src + join src1 on src.key = src1.key + where src1.value = "val_255") s2; diff --git ql/src/test/results/clientpositive/llap/vectorized_multi_output_select.q.out ql/src/test/results/clientpositive/llap/vectorized_multi_output_select.q.out new file mode 100644 index 0000000..f744eb6 --- /dev/null +++ ql/src/test/results/clientpositive/llap/vectorized_multi_output_select.q.out @@ -0,0 +1,201 @@ +Warning: Map Join MAPJOIN[43][bigTable=?] in task 'Reducer 2' is a cross product +PREHOOK: query: explain +select * from ( + select count(*) as h8_30_to_9 + from src + join src1 on src.key = src1.key + where src1.value = "val_278") s1 +join ( + select count(*) as h9_to_9_30 + from src + join src1 on src.key = src1.key + where src1.value = "val_255") s2 +PREHOOK: type: QUERY +POSTHOOK: query: explain +select * from ( + select count(*) as h8_30_to_9 + from src + join src1 on src.key = src1.key + where src1.value = "val_278") s1 +join ( + select count(*) as h9_to_9_30 + from src + join src1 on src.key = src1.key + where src1.value = "val_255") s2 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 4 (BROADCAST_EDGE), Map 5 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 3 (BROADCAST_EDGE) + Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + input vertices: + 1 Map 4 + Statistics: Num rows: 4 Data size: 32 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + input vertices: + 1 Map 5 + Statistics: Num rows: 4 Data size: 32 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: vectorized, llap + LLAP IO: no inputs + Map 4 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: ((value = 'val_278') and key is not null) (type: boolean) + Statistics: Num rows: 2 Data size: 350 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: vectorized, llap + LLAP IO: no inputs + Map 5 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 25 Data size: 4375 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: ((value = 'val_255') and key is not null) (type: boolean) + Statistics: Num rows: 2 Data size: 350 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 2 Data size: 354 Basic stats: COMPLETE Column stats: COMPLETE + Execution mode: vectorized, llap + LLAP IO: no inputs + Reducer 2 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 + 1 + outputColumnNames: _col0, _col1 + input vertices: + 1 Reducer 3 + Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 3 + Execution mode: vectorized, llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +Warning: Map Join MAPJOIN[43][bigTable=?] in task 'Reducer 2' is a cross product +PREHOOK: query: select * from ( + select count(*) as h8_30_to_9 + from src + join src1 on src.key = src1.key + where src1.value = "val_278") s1 +join ( + select count(*) as h9_to_9_30 + from src + join src1 on src.key = src1.key + where src1.value = "val_255") s2 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src1 +#### A masked pattern was here #### +POSTHOOK: query: select * from ( + select count(*) as h8_30_to_9 + from src + join src1 on src.key = src1.key + where src1.value = "val_278") s1 +join ( + select count(*) as h9_to_9_30 + from src + join src1 on src.key = src1.key + where src1.value = "val_255") s2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src1 +#### A masked pattern was here #### +2 2