diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java index b41d77d..879aa58 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java @@ -397,24 +397,31 @@ private boolean sameOrder(String order1, String order2) { } } if (current instanceof JoinOperator) { - LinkedHashSet correlatedRsOps = - new LinkedHashSet(); + boolean isCorrelated = true; + int expectedNumCorrelatedRsops = current.getParentOperators().size(); + LinkedHashSet correlatedRsops = null; for (Operator parent : current.getParentOperators()) { Set tableNames = pCtx.getOpParseCtx().get(parent).getRowResolver().getTableNames(); for (String tbl : tableNames) { if (tableNeedToCheck.contains(tbl)) { - correlatedRsOps.addAll(findCorrelatedReduceSinkOperators( - current, backtrackedKeyCols, backtrackedPartitionCols, childRSOrder, - parent, correlation)); + correlatedRsops = findCorrelatedReduceSinkOperators(current, + backtrackedKeyCols, backtrackedPartitionCols, + childRSOrder, parent, correlation); + if (correlatedRsops.size() != expectedNumCorrelatedRsops) { + isCorrelated = false; + } } } + if (!isCorrelated) { + break; + } } // If current is JoinOperaotr, we will stop to traverse the tree // when any of parent ReduceSinkOperaotr of this JoinOperator is // not considered as a correlated ReduceSinkOperator. - if (correlatedRsOps.size() == current.getParentOperators().size()) { - correlatedReduceSinkOperators.addAll(correlatedRsOps); + if (isCorrelated && correlatedRsops != null) { + correlatedReduceSinkOperators.addAll(correlatedRsops); } else { correlatedReduceSinkOperators.clear(); } diff --git ql/src/test/queries/clientpositive/correlationoptimizer1.q ql/src/test/queries/clientpositive/correlationoptimizer1.q index b3fd3f7..0596f96 100644 --- ql/src/test/queries/clientpositive/correlationoptimizer1.q +++ ql/src/test/queries/clientpositive/correlationoptimizer1.q @@ -104,7 +104,7 @@ FROM (SELECT x.key AS key, count(1) AS cnt set hive.optimize.correlation=false; -- If the key of a GroupByOperator is the right table's key in --- a Left Outer Join, we cannot use a single MR to execute these two +-- a Left Outer Join, we cannot use a single MR to execute these two -- operators because those keys with a null value are not grouped. EXPLAIN SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)) @@ -130,6 +130,29 @@ FROM (SELECT y.key AS key, count(1) AS cnt GROUP BY y.key) tmp; set hive.optimize.correlation=false; +-- If a column of the key of a GroupByOperator is the right table's key in +-- a Left Outer Join, we cannot use a single MR to execute these two +-- operators because those keys with a null value are not grouped. +EXPLAIN +SELECT x.key, y.value, count(1) AS cnt +FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key AND x.value = y.value) +GROUP BY x.key, y.value; + +SELECT x.key, y.value, count(1) AS cnt +FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key AND x.value = y.value) +GROUP BY x.key, y.value; + +set hive.optimize.correlation=true; +EXPLAIN +SELECT x.key, y.value, count(1) AS cnt +FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key AND x.value = y.value) +GROUP BY x.key, y.value; + +SELECT x.key, y.value, count(1) AS cnt +FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key AND x.value = y.value) +GROUP BY x.key, y.value; + +set hive.optimize.correlation=false; -- If the key of a GroupByOperator is the right table's key in -- a Right Outer Join, these two operators will be executed in -- the same MR job when Correlation Optimizer is enabled. diff --git ql/src/test/results/clientpositive/correlationoptimizer1.q.out ql/src/test/results/clientpositive/correlationoptimizer1.q.out index dc46a77..c438ce5 100644 --- ql/src/test/results/clientpositive/correlationoptimizer1.q.out +++ ql/src/test/results/clientpositive/correlationoptimizer1.q.out @@ -1269,7 +1269,7 @@ POSTHOOK: Input: default@src1 #### A masked pattern was here #### 652447 47 PREHOOK: query: -- If the key of a GroupByOperator is the right table's key in --- a Left Outer Join, we cannot use a single MR to execute these two +-- a Left Outer Join, we cannot use a single MR to execute these two -- operators because those keys with a null value are not grouped. EXPLAIN SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)) @@ -1278,7 +1278,7 @@ FROM (SELECT y.key AS key, count(1) AS cnt GROUP BY y.key) tmp PREHOOK: type: QUERY POSTHOOK: query: -- If the key of a GroupByOperator is the right table's key in --- a Left Outer Join, we cannot use a single MR to execute these two +-- a Left Outer Join, we cannot use a single MR to execute these two -- operators because those keys with a null value are not grouped. EXPLAIN SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)) @@ -1654,6 +1654,372 @@ POSTHOOK: Input: default@src POSTHOOK: Input: default@src1 #### A masked pattern was here #### 652447 47 +PREHOOK: query: -- If a column of the key of a GroupByOperator is the right table's key in +-- a Left Outer Join, we cannot use a single MR to execute these two +-- operators because those keys with a null value are not grouped. +EXPLAIN +SELECT x.key, y.value, count(1) AS cnt +FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key AND x.value = y.value) +GROUP BY x.key, y.value +PREHOOK: type: QUERY +POSTHOOK: query: -- If a column of the key of a GroupByOperator is the right table's key in +-- a Left Outer Join, we cannot use a single MR to execute these two +-- operators because those keys with a null value are not grouped. +EXPLAIN +SELECT x.key, y.value, count(1) AS cnt +FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key AND x.value = y.value) +GROUP BY x.key, y.value +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_LEFTOUTERJOIN (TOK_TABREF (TOK_TABNAME src1) x) (TOK_TABREF (TOK_TABNAME src) y) (AND (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL y) key)) (= (. (TOK_TABLE_OR_COL x) value) (. (TOK_TABLE_OR_COL y) value))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL x) key)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL y) value)) (TOK_SELEXPR (TOK_FUNCTION count 1) cnt)) (TOK_GROUPBY (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL y) value)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + x + TableScan + alias: x + Reduce Output Operator + key expressions: + expr: key + type: string + expr: value + type: string + sort order: ++ + Map-reduce partition columns: + expr: key + type: string + expr: value + type: string + tag: 0 + value expressions: + expr: key + type: string + y + TableScan + alias: y + Reduce Output Operator + key expressions: + expr: key + type: string + expr: value + type: string + sort order: ++ + Map-reduce partition columns: + expr: key + type: string + expr: value + type: string + tag: 1 + value expressions: + expr: value + type: string + Reduce Operator Tree: + Join Operator + condition map: + Left Outer Join0 to 1 + condition expressions: + 0 {VALUE._col0} + 1 {VALUE._col1} + handleSkewJoin: false + outputColumnNames: _col0, _col5 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col5 + type: string + outputColumnNames: _col0, _col5 + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: _col0 + type: string + expr: _col5 + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + sort order: ++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + tag: -1 + value expressions: + expr: _col2 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: bigint + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT x.key, y.value, count(1) AS cnt +FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key AND x.value = y.value) +GROUP BY x.key, y.value +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT x.key, y.value, count(1) AS cnt +FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key AND x.value = y.value) +GROUP BY x.key, y.value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src1 +#### A masked pattern was here #### + NULL 10 +128 NULL 1 +146 val_146 2 +150 val_150 1 +213 val_213 2 +224 NULL 1 +238 val_238 2 +255 val_255 2 +273 val_273 3 +278 val_278 2 +311 val_311 3 +369 NULL 1 +401 val_401 5 +406 val_406 4 +66 val_66 1 +98 val_98 2 +PREHOOK: query: EXPLAIN +SELECT x.key, y.value, count(1) AS cnt +FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key AND x.value = y.value) +GROUP BY x.key, y.value +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT x.key, y.value, count(1) AS cnt +FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key AND x.value = y.value) +GROUP BY x.key, y.value +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_LEFTOUTERJOIN (TOK_TABREF (TOK_TABNAME src1) x) (TOK_TABREF (TOK_TABNAME src) y) (AND (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL y) key)) (= (. (TOK_TABLE_OR_COL x) value) (. (TOK_TABLE_OR_COL y) value))))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL x) key)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL y) value)) (TOK_SELEXPR (TOK_FUNCTION count 1) cnt)) (TOK_GROUPBY (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL y) value)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-2 depends on stages: Stage-1 + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + x + TableScan + alias: x + Reduce Output Operator + key expressions: + expr: key + type: string + expr: value + type: string + sort order: ++ + Map-reduce partition columns: + expr: key + type: string + expr: value + type: string + tag: 0 + value expressions: + expr: key + type: string + y + TableScan + alias: y + Reduce Output Operator + key expressions: + expr: key + type: string + expr: value + type: string + sort order: ++ + Map-reduce partition columns: + expr: key + type: string + expr: value + type: string + tag: 1 + value expressions: + expr: value + type: string + Reduce Operator Tree: + Join Operator + condition map: + Left Outer Join0 to 1 + condition expressions: + 0 {VALUE._col0} + 1 {VALUE._col1} + handleSkewJoin: false + outputColumnNames: _col0, _col5 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col5 + type: string + outputColumnNames: _col0, _col5 + Group By Operator + aggregations: + expr: count(1) + bucketGroup: false + keys: + expr: _col0 + type: string + expr: _col5 + type: string + mode: hash + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + Reduce Output Operator + key expressions: + expr: _col0 + type: string + expr: _col1 + type: string + sort order: ++ + Map-reduce partition columns: + expr: _col0 + type: string + expr: _col1 + type: string + tag: -1 + value expressions: + expr: _col2 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + expr: KEY._col1 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1, _col2 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + expr: _col2 + type: bigint + outputColumnNames: _col0, _col1, _col2 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + + +PREHOOK: query: SELECT x.key, y.value, count(1) AS cnt +FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key AND x.value = y.value) +GROUP BY x.key, y.value +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@src1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT x.key, y.value, count(1) AS cnt +FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key AND x.value = y.value) +GROUP BY x.key, y.value +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@src1 +#### A masked pattern was here #### + NULL 10 +128 NULL 1 +146 val_146 2 +150 val_150 1 +213 val_213 2 +224 NULL 1 +238 val_238 2 +255 val_255 2 +273 val_273 3 +278 val_278 2 +311 val_311 3 +369 NULL 1 +401 val_401 5 +406 val_406 4 +66 val_66 1 +98 val_98 2 PREHOOK: query: -- If the key of a GroupByOperator is the right table's key in -- a Right Outer Join, these two operators will be executed in -- the same MR job when Correlation Optimizer is enabled.