diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 3efc1ac..2d8117f 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -264,6 +264,7 @@ minitez.query.files.shared=acid_globallimit.q,\ vector_distinct_2.q,\ vector_elt.q,\ vector_groupby_3.q,\ + vector_groupby_mapjoin.q,\ vector_groupby_reduce.q,\ vector_grouping_sets.q,\ vector_if_expr.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index a842649..ee85286 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -873,9 +873,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { for (Node n : stack) { Operator op = (Operator) n; - if (nonVectorizableChildOfGroupBy(op)) { - return new Boolean(true); - } boolean ret; try { ret = validateMapWorkOperator(op, mapWork, isTez); @@ -886,6 +883,11 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); } + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. + if (isVectorizedGroupByThatOutputsRows(op)) { + return new Boolean(true); + } } return new Boolean(true); } @@ -898,14 +900,16 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { for (Node n : stack) { Operator op = (Operator) n; - if (nonVectorizableChildOfGroupBy(op)) { - return new Boolean(true); - } boolean ret = validateReduceWorkOperator(op); if (!ret) { LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); } + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. + if (isVectorizedGroupByThatOutputsRows(op)) { + return new Boolean(true); + } } return new Boolean(true); } @@ -995,18 +999,25 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, private VectorTaskColumnInfo vectorTaskColumnInfo; private final boolean isTez; + private boolean stopVectorizing; + public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez, VectorTaskColumnInfo vectorTaskColumnInfo) { super(); this.mWork = mWork; this.vectorTaskColumnInfo = vectorTaskColumnInfo; this.isTez = isTez; + stopVectorizing = false; } @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + if (stopVectorizing) { + return null; + } + Operator op = (Operator) nd; VectorizationContext vContext = null; @@ -1031,16 +1042,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + " using vectorization context" + vContext.toString()); } - // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't - // vectorize the operators below it. - if (nonVectorizableChildOfGroupBy(op)) { - // No need to vectorize - if (!opsDone.contains(op)) { - opsDone.add(op); - } - return null; - } - Operator vectorOp = doVectorize(op, vContext, isTez); if (LOG.isDebugEnabled()) { @@ -1051,6 +1052,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } } + // We don't vectorize after hitting a Vectorized GROUP BY that outputs rows. + stopVectorizing = isVectorizedGroupByThatOutputsRows(op); + return null; } } @@ -1063,6 +1067,8 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, private Operator rootVectorOp; + private boolean stopVectorizing; + public Operator getRootVectorOp() { return rootVectorOp; } @@ -1074,12 +1080,17 @@ public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColum this.vectorTaskColumnInfo = vectorTaskColumnInfo; rootVectorOp = null; this.isTez = isTez; + stopVectorizing = false; } @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + if (stopVectorizing) { + return null; + } + Operator op = (Operator) nd; VectorizationContext vContext = null; @@ -1110,16 +1121,6 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, assert vContext != null; LOG.info("ReduceWorkVectorizationNodeProcessor process operator " + op.getName() + " using vectorization context" + vContext.toString()); - // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't - // vectorize the operators below it. - if (nonVectorizableChildOfGroupBy(op)) { - // No need to vectorize - if (!opsDone.contains(op)) { - opsDone.add(op); - } - return null; - } - Operator vectorOp = doVectorize(op, vContext, isTez); if (LOG.isDebugEnabled()) { @@ -1133,6 +1134,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, rootVectorOp = vectorOp; } + // We don't vectorize after hitting a Vectorized GROUP BY that outputs rows. + stopVectorizing = isVectorizedGroupByThatOutputsRows(op); + return null; } } @@ -1267,19 +1271,13 @@ boolean validateReduceWorkOperator(Operator op) { return ret; } - public Boolean nonVectorizableChildOfGroupBy(Operator op) { - Operator currentOp = op; - while (currentOp.getParentOperators().size() > 0) { - currentOp = currentOp.getParentOperators().get(0); - if (currentOp.getType().equals(OperatorType.GROUPBY)) { - GroupByDesc desc = (GroupByDesc)currentOp.getConf(); - boolean isVectorOutput = desc.getVectorDesc().isVectorOutput(); - if (isVectorOutput) { - // This GROUP BY does vectorize its output. - return false; - } - return true; - } + // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't + // vectorize the operators below it. + public Boolean isVectorizedGroupByThatOutputsRows(Operator op) + throws SemanticException { + if (op.getType().equals(OperatorType.GROUPBY)) { + GroupByDesc desc = (GroupByDesc) op.getConf(); + return !desc.getVectorDesc().isVectorOutput(); } return false; } diff --git ql/src/test/queries/clientpositive/vector_groupby_mapjoin.q ql/src/test/queries/clientpositive/vector_groupby_mapjoin.q new file mode 100644 index 0000000..a5dd2fe --- /dev/null +++ ql/src/test/queries/clientpositive/vector_groupby_mapjoin.q @@ -0,0 +1,22 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +SET hive.vectorized.execution.enabled = true; +set hive.fetch.task.conversion=none; +SET hive.auto.convert.join=true; +SET hive.auto.convert.join.noconditionaltask=true; +SET hive.auto.convert.join.noconditionaltask.size=1000000000; +set hive.exec.dynamic.partition.mode=nonstrict; + +-- HIVE-12738 -- We are checking if a MapJoin after a GroupBy will work properly. +explain +select * +from src +where not key in +(select key from src) +order by key; + +select * +from src +where not key in +(select key from src) +order by key; \ No newline at end of file diff --git ql/src/test/results/clientpositive/tez/vector_groupby_mapjoin.q.out ql/src/test/results/clientpositive/tez/vector_groupby_mapjoin.q.out new file mode 100644 index 0000000..42a67c2 --- /dev/null +++ ql/src/test/results/clientpositive/tez/vector_groupby_mapjoin.q.out @@ -0,0 +1,159 @@ +Warning: Map Join MAPJOIN[29][bigTable=?] in task 'Reducer 3' is a cross product +PREHOOK: query: -- HIVE-12738 -- We are checking if a MapJoin after a GroupBy will work properly. +explain +select * +from src +where not key in +(select key from src) +order by key +PREHOOK: type: QUERY +POSTHOOK: query: -- HIVE-12738 -- We are checking if a MapJoin after a GroupBy will work properly. +explain +select * +from src +where not key in +(select key from src) +order by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 3 <- Map 1 (BROADCAST_EDGE), Map 2 (SIMPLE_EDGE), Map 5 (BROADCAST_EDGE) + Reducer 4 <- Reducer 3 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string), _col1 (type: string) + Map 2 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 5 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Reducer 3 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col0 = 0) (type: boolean) + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 + 1 + outputColumnNames: _col0, _col1 + input vertices: + 0 Map 1 + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Left Outer Join0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col3 + input vertices: + 1 Map 5 + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + HybridGraceHashJoin: true + Filter Operator + predicate: _col3 is null (type: boolean) + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Reducer 4 + Execution mode: vectorized + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +Warning: Map Join MAPJOIN[29][bigTable=?] in task 'Reducer 3' is a cross product +PREHOOK: query: select * +from src +where not key in +(select key from src) +order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select * +from src +where not key in +(select key from src) +order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### diff --git ql/src/test/results/clientpositive/vector_groupby_mapjoin.q.out ql/src/test/results/clientpositive/vector_groupby_mapjoin.q.out new file mode 100644 index 0000000..93cac71 --- /dev/null +++ ql/src/test/results/clientpositive/vector_groupby_mapjoin.q.out @@ -0,0 +1,167 @@ +Warning: Map Join MAPJOIN[34][bigTable=?] in task 'Stage-3:MAPRED' is a cross product +PREHOOK: query: -- HIVE-12738 -- We are checking if a MapJoin after a GroupBy will work properly. +explain +select * +from src +where not key in +(select key from src) +order by key +PREHOOK: type: QUERY +POSTHOOK: query: -- HIVE-12738 -- We are checking if a MapJoin after a GroupBy will work properly. +explain +select * +from src +where not key in +(select key from src) +order by key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-4 is a root stage + Stage-8 depends on stages: Stage-4 + Stage-3 depends on stages: Stage-8 + Stage-0 depends on stages: Stage-3 + +STAGE PLANS: + Stage: Stage-4 + Map Reduce + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: (_col0 = 0) (type: boolean) + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Select Operator + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-8 + Map Reduce Local Work + Alias -> Map Local Tables: + $hdt$_0:src + Fetch Operator + limit: -1 + $hdt$_2:src + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + $hdt$_0:src + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + HashTable Sink Operator + keys: + 0 + 1 + $hdt$_2:src + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + HashTable Sink Operator + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + + Stage: Stage-3 + Map Reduce + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 + 1 + outputColumnNames: _col0, _col1 + Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Left Outer Join0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + outputColumnNames: _col0, _col1, _col3 + Statistics: Num rows: 605 Data size: 6427 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: _col3 is null (type: boolean) + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string), _col1 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) + Local Work: + Map Reduce Local Work + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 302 Data size: 3208 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +Warning: Map Join MAPJOIN[34][bigTable=?] in task 'Stage-3:MAPRED' is a cross product +PREHOOK: query: select * +from src +where not key in +(select key from src) +order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select * +from src +where not key in +(select key from src) +order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here ####