diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 3efc1ac..2d8117f 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -264,6 +264,7 @@ minitez.query.files.shared=acid_globallimit.q,\ vector_distinct_2.q,\ vector_elt.q,\ vector_groupby_3.q,\ + vector_groupby_mapjoin.q,\ vector_groupby_reduce.q,\ vector_grouping_sets.q,\ vector_if_expr.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index a842649..e10800f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -338,6 +338,8 @@ public Vectorizer() { String[] scratchTypeNameArray; + Set> nonVectorizedOps; + VectorTaskColumnInfo() { partitionColumnCount = 0; } @@ -355,6 +357,13 @@ public void setScratchTypeNameArray(String[] scratchTypeNameArray) { this.scratchTypeNameArray = scratchTypeNameArray; } + public void setNonVectorizedOps(Set> nonVectorizedOps) { + this.nonVectorizedOps = nonVectorizedOps; + } + public Set> getNonVectorizedOps() { + return nonVectorizedOps; + } + public void transferToBaseWork(BaseWork baseWork) { String[] columnNameArray = columnNames.toArray(new String[0]); @@ -701,6 +710,7 @@ private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTask } } } + vectorTaskColumnInfo.setNonVectorizedOps(vnp.getNonVectorizedOps()); return true; } @@ -819,6 +829,7 @@ private boolean validateReduceWork(ReduceWork reduceWork, } } } + vectorTaskColumnInfo.setNonVectorizedOps(vnp.getNonVectorizedOps()); return true; } @@ -863,6 +874,14 @@ private void vectorizeReduceWork(ReduceWork reduceWork, private final MapWork mapWork; private final boolean isTez; + // Children of Vectorized GROUPBY that outputs rows instead of vectorized row batchs. + protected final Set> nonVectorizedOps = + new HashSet>(); + + public Set> getNonVectorizedOps() { + return nonVectorizedOps; + } + public MapWorkValidationNodeProcessor(MapWork mapWork, boolean isTez) { this.mapWork = mapWork; this.isTez = isTez; @@ -873,7 +892,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { for (Node n : stack) { Operator op = (Operator) n; - if (nonVectorizableChildOfGroupBy(op)) { + if (nonVectorizedOps.contains(op)) { return new Boolean(true); } boolean ret; @@ -886,6 +905,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); } + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. + if (isVectorizedGroupByThatOutputsRows(op)) { + addOperatorChildrenToSet(op, nonVectorizedOps); + return new Boolean(true); + } } return new Boolean(true); } @@ -893,12 +918,24 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, class ReduceWorkValidationNodeProcessor implements NodeProcessor { + // Children of Vectorized GROUPBY that outputs rows instead of vectorized row batchs. + protected final Set> nonVectorizedOps = + new HashSet>(); + + public Set> getNonVectorizeOps() { + return nonVectorizedOps; + } + + public Set> getNonVectorizedOps() { + return nonVectorizedOps; + } + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { for (Node n : stack) { Operator op = (Operator) n; - if (nonVectorizableChildOfGroupBy(op)) { + if (nonVectorizedOps.contains(op)) { return new Boolean(true); } boolean ret = validateReduceWorkOperator(op); @@ -906,6 +943,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); } + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. + if (isVectorizedGroupByThatOutputsRows(op)) { + addOperatorChildrenToSet(op, nonVectorizedOps); + return new Boolean(true); + } } return new Boolean(true); } @@ -918,7 +961,10 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, // The vectorization context for the Map or Reduce task. protected VectorizationContext taskVectorizationContext; - VectorizationNodeProcessor() { + protected final Set> nonVectorizedOps; + + VectorizationNodeProcessor(Set> nonVectorizedOps) { + this.nonVectorizedOps = nonVectorizedOps; } public String[] getVectorScratchColumnTypeNames() { @@ -997,7 +1043,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez, VectorTaskColumnInfo vectorTaskColumnInfo) { - super(); + super(vectorTaskColumnInfo.getNonVectorizedOps()); this.mWork = mWork; this.vectorTaskColumnInfo = vectorTaskColumnInfo; this.isTez = isTez; @@ -1008,6 +1054,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { Operator op = (Operator) nd; + if (nonVectorizedOps.contains(op)) { + return null; + } VectorizationContext vContext = null; @@ -1031,16 +1080,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + " using vectorization context" + vContext.toString()); } - // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't - // vectorize the operators below it. - if (nonVectorizableChildOfGroupBy(op)) { - // No need to vectorize - if (!opsDone.contains(op)) { - opsDone.add(op); - } - return null; - } - Operator vectorOp = doVectorize(op, vContext, isTez); if (LOG.isDebugEnabled()) { @@ -1070,7 +1109,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) { - super(); + super(vectorTaskColumnInfo.getNonVectorizedOps()); this.vectorTaskColumnInfo = vectorTaskColumnInfo; rootVectorOp = null; this.isTez = isTez; @@ -1081,6 +1120,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { Operator op = (Operator) nd; + if (nonVectorizedOps.contains(op)) { + return null; + } VectorizationContext vContext = null; @@ -1110,16 +1152,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, assert vContext != null; LOG.info("ReduceWorkVectorizationNodeProcessor process operator " + op.getName() + " using vectorization context" + vContext.toString()); - // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't - // vectorize the operators below it. - if (nonVectorizableChildOfGroupBy(op)) { - // No need to vectorize - if (!opsDone.contains(op)) { - opsDone.add(op); - } - return null; - } - Operator vectorOp = doVectorize(op, vContext, isTez); if (LOG.isDebugEnabled()) { @@ -1267,20 +1299,24 @@ boolean validateReduceWorkOperator(Operator op) { return ret; } - public Boolean nonVectorizableChildOfGroupBy(Operator op) { - Operator currentOp = op; - while (currentOp.getParentOperators().size() > 0) { - currentOp = currentOp.getParentOperators().get(0); - if (currentOp.getType().equals(OperatorType.GROUPBY)) { - GroupByDesc desc = (GroupByDesc)currentOp.getConf(); - boolean isVectorOutput = desc.getVectorDesc().isVectorOutput(); - if (isVectorOutput) { - // This GROUP BY does vectorize its output. - return false; - } - return true; + private void addOperatorChildrenToSet(Operator op, + Set> nonVectorizedOps) { + for (Operator childOp : op.getChildOperators()) { + if (!nonVectorizedOps.contains(childOp)) { + nonVectorizedOps.add(childOp); + addOperatorChildrenToSet(childOp, nonVectorizedOps); } } + } + + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. + private Boolean isVectorizedGroupByThatOutputsRows(Operator op) + throws SemanticException { + if (op.getType().equals(OperatorType.GROUPBY)) { + GroupByDesc desc = (GroupByDesc) op.getConf(); + return !desc.getVectorDesc().isVectorOutput(); + } return false; } diff --git ql/src/test/queries/clientpositive/vector_groupby_mapjoin.q ql/src/test/queries/clientpositive/vector_groupby_mapjoin.q new file mode 100644 index 0000000..a3cec04 --- /dev/null +++ ql/src/test/queries/clientpositive/vector_groupby_mapjoin.q @@ -0,0 +1,22 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=true; +SET hive.vectorized.execution.enabled = true; +set hive.fetch.task.conversion=none; +SET hive.auto.convert.join=true; +SET hive.auto.convert.join.noconditionaltask=true; +SET hive.auto.convert.join.noconditionaltask.size=1000000000; +set hive.exec.dynamic.partition.mode=nonstrict; + +-- HIVE-12738 -- We are checking if a MapJoin after a GroupBy will work properly. +explain +select * +from src +where not key in +(select key from src) +order by key; + +select * +from src +where not key in +(select key from src) +order by key; \ No newline at end of file diff --git ql/src/test/results/clientpositive/tez/vector_groupby_mapjoin.q.out ql/src/test/results/clientpositive/tez/vector_groupby_mapjoin.q.out new file mode 100644 index 0000000..dedcec8 --- /dev/null +++ ql/src/test/results/clientpositive/tez/vector_groupby_mapjoin.q.out @@ -0,0 +1,125 @@ +Warning: Map Join MAPJOIN[28][bigTable=?] in task 'Reducer 3' is a cross product +PREHOOK: query: -- HIVE-12738 -- We are checking if a MapJoin after a GroupBy will work properly. +explain +select * +from src +where not key in +(select key from src) +order by key +PREHOOK: type: QUERY +POSTHOOK: query: -- HIVE-12738 -- We are checking if a MapJoin after a GroupBy will work properly. +explain +select * +from src +where not key in +(select key from src) +order by key +POSTHOOK: type: QUERY +Plan optimized by CBO. + +Vertex dependency in root stage +Reducer 3 <- Map 1 (BROADCAST_EDGE), Map 2 (SIMPLE_EDGE), Map 5 (BROADCAST_EDGE) +Reducer 4 <- Reducer 3 (SIMPLE_EDGE) + +Stage-0 + Fetch Operator + limit:-1 + Stage-1 + Reducer 4 vectorized + File Output Operator [FS_34] + compressed:false + Statistics:Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + table:{"input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat","serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"} + Select Operator [OP_33] + | outputColumnNames:["_col0","_col1"] + | Statistics:Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + |<-Reducer 3 [SIMPLE_EDGE] vectorized + Reduce Output Operator [RS_22] + key expressions:_col0 (type: string) + sort order:+ + Statistics:Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + value expressions:_col1 (type: string) + Select Operator [SEL_21] + outputColumnNames:["_col0","_col1"] + Statistics:Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + Filter Operator [FIL_20] + predicate:_col3 is null (type: boolean) + Statistics:Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + Map Join Operator [MAPJOIN_29] + | condition map:[{"":"Left Outer Join0 to 1"}] + | HybridGraceHashJoin:true + | keys:{"Reducer 3":"_col0 (type: string)","Map 5":"_col0 (type: string)"} + | outputColumnNames:["_col0","_col1","_col3"] + | Statistics:Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + |<-Map 5 [BROADCAST_EDGE] + | Reduce Output Operator [RS_18] + | key expressions:_col0 (type: string) + | Map-reduce partition columns:_col0 (type: string) + | sort order:+ + | Statistics:Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + | Select Operator [SEL_12] + | outputColumnNames:["_col0"] + | Statistics:Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + | TableScan [TS_11] + | alias:src + | Statistics:Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + |<-Map Join Operator [MAPJOIN_28] + | condition map:[{"":"Inner Join 0 to 1"}] + | keys:{} + | outputColumnNames:["_col0","_col1"] + | Statistics:Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + |<-Map 1 [BROADCAST_EDGE] + | Reduce Output Operator [RS_14] + | sort order: + | Statistics:Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + | value expressions:_col0 (type: string), _col1 (type: string) + | Select Operator [SEL_1] + | outputColumnNames:["_col0","_col1"] + | Statistics:Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + | TableScan [TS_0] + | alias:src + | Statistics:Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + |<-Select Operator [SEL_10] + Statistics:Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Filter Operator [FIL_9] + predicate:(_col0 = 0) (type: boolean) + Statistics:Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Group By Operator [OP_32] + | aggregations:["count(VALUE._col0)"] + | outputColumnNames:["_col0"] + | Statistics:Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + |<-Map 2 [SIMPLE_EDGE] + Reduce Output Operator [RS_6] + sort order: + Statistics:Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions:_col0 (type: bigint) + Group By Operator [GBY_5] + aggregations:["count()"] + outputColumnNames:["_col0"] + Statistics:Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator [SEL_4] + Statistics:Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Filter Operator [FIL_26] + predicate:key is null (type: boolean) + Statistics:Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + TableScan [TS_2] + alias:src + Statistics:Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + +Warning: Map Join MAPJOIN[28][bigTable=?] in task 'Reducer 3' is a cross product +PREHOOK: query: select * +from src +where not key in +(select key from src) +order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select * +from src +where not key in +(select key from src) +order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### diff --git ql/src/test/results/clientpositive/vector_groupby_mapjoin.q.out ql/src/test/results/clientpositive/vector_groupby_mapjoin.q.out new file mode 100644 index 0000000..367eb59 --- /dev/null +++ ql/src/test/results/clientpositive/vector_groupby_mapjoin.q.out @@ -0,0 +1,167 @@ +Warning: Map Join MAPJOIN[33][bigTable=?] in task 'Stage-3:MAPRED' is a cross product +PREHOOK: query: -- HIVE-12738 -- We are checking if a MapJoin after a GroupBy will work properly. +explain +select * +from src +where not key in +(select key from src) +order by key +PREHOOK: type: QUERY +POSTHOOK: query: -- HIVE-12738 -- We are checking if a MapJoin after a GroupBy will work properly. +explain +select * +from src +where not key in +(select key from src) +order by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-4 is a root stage + Stage-8 depends on stages: Stage-4 + Stage-3 depends on stages: Stage-8 + Stage-0 depends on stages: Stage-3 + +STAGE PLANS: + Stage: Stage-4 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col0 = 0) (type: boolean) + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-8 + Map Reduce Local Work + Alias -> Map Local Tables: + $hdt$_0:src + Fetch Operator + limit: -1 + $hdt$_2:src + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + $hdt$_0:src + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + HashTable Sink Operator + keys: + 0 + 1 + $hdt$_2:src + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + HashTable Sink Operator + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + + Stage: Stage-3 + Map Reduce + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 + 1 + outputColumnNames: _col0, _col1 + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Left Outer Join0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col3 + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: _col3 is null (type: boolean) + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Local Work: + Map Reduce Local Work + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +Warning: Map Join MAPJOIN[33][bigTable=?] in task 'Stage-3:MAPRED' is a cross product +PREHOOK: query: select * +from src +where not key in +(select key from src) +order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select * +from src +where not key in +(select key from src) +order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here ####