diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index 762f734..daf9698 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -288,9 +288,6 @@ private void visit(SparkTran child, Set seen, List result) * @param child */ public void connect(SparkTran parent, SparkTran child) { - if (getChildren(parent).contains(child)) { - throw new IllegalStateException("Connection already exists"); - } rootTrans.remove(child); leafTrans.remove(parent); if (transGraph.get(parent) == null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java index b7c57e8..9c4c25e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java @@ -210,16 +210,6 @@ private boolean compareWork(BaseWork first, BaseWork second, SparkWork sparkWork return false; } - // If these two Works share the same child, we can not combine them as SparkPlan does not - // support multi edge between two Works. - List firstChildren = sparkWork.getChildren(first); - List secondChildren = sparkWork.getChildren(second); - for (BaseWork child : firstChildren) { - if (secondChildren.contains(child)) { - return false; - } - } - Set> firstRootOperators = first.getAllRootOperators(); Set> secondRootOperators = second.getAllRootOperators(); if (firstRootOperators.size() != secondRootOperators.size()) { diff --git a/ql/src/test/queries/clientpositive/dynamic_rdd_cache.q b/ql/src/test/queries/clientpositive/dynamic_rdd_cache.q index a380b15..c77c8f3 100644 --- a/ql/src/test/queries/clientpositive/dynamic_rdd_cache.q +++ b/ql/src/test/queries/clientpositive/dynamic_rdd_cache.q @@ -97,6 +97,13 @@ ORDER BY inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean,inv1.cov ,inv2.d_moy,inv2.mean, inv2.cov ; +EXPLAIN +WITH test AS +(SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM inventory + UNION ALL + SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM inventory) +SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM test SORT BY inv_quantity_on_hand; + DROP TABLE inv; DROP TABLE inventory; DROP TABLE item; diff --git a/ql/src/test/results/clientpositive/dynamic_rdd_cache.q.out b/ql/src/test/results/clientpositive/dynamic_rdd_cache.q.out index bc716a0..a879d4e 100644 --- a/ql/src/test/results/clientpositive/dynamic_rdd_cache.q.out +++ b/ql/src/test/results/clientpositive/dynamic_rdd_cache.q.out @@ -1309,6 +1309,75 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: EXPLAIN +WITH test AS +(SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM inventory + UNION ALL + SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM inventory) +SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM test SORT BY inv_quantity_on_hand +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +WITH test AS +(SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM inventory + UNION ALL + SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM inventory) +SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM test SORT BY inv_quantity_on_hand +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: inventory + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: inv_date_sk (type: int), inv_item_sk (type: int), inv_quantity_on_hand (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Union + Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col2 (type: int) + sort order: + + Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int) + TableScan + alias: inventory + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: inv_date_sk (type: int), inv_item_sk (type: int), inv_quantity_on_hand (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Union + Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col2 (type: int) + sort order: + + Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int) + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), VALUE._col1 (type: int), KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: true + Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + PREHOOK: query: DROP TABLE inv PREHOOK: type: DROPTABLE PREHOOK: Input: default@inv diff --git a/ql/src/test/results/clientpositive/spark/dynamic_rdd_cache.q.out b/ql/src/test/results/clientpositive/spark/dynamic_rdd_cache.q.out index 505cc59..02d0419 100644 --- a/ql/src/test/results/clientpositive/spark/dynamic_rdd_cache.q.out +++ b/ql/src/test/results/clientpositive/spark/dynamic_rdd_cache.q.out @@ -964,6 +964,65 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: EXPLAIN +WITH test AS +(SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM inventory + UNION ALL + SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM inventory) +SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM test SORT BY inv_quantity_on_hand +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +WITH test AS +(SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM inventory + UNION ALL + SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM inventory) +SELECT inv_date_sk , inv_item_sk ,inv_quantity_on_hand FROM test SORT BY inv_quantity_on_hand +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 31), Map 1 (PARTITION-LEVEL SORT, 31) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: inventory + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: inv_date_sk (type: int), inv_item_sk (type: int), inv_quantity_on_hand (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Reduce Output Operator + key expressions: _col2 (type: int) + sort order: + + Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE + value expressions: _col0 (type: int), _col1 (type: int) + Reducer 2 + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: int), VALUE._col1 (type: int), KEY.reducesinkkey0 (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE + File Output Operator + compressed: true + Statistics: Num rows: 2 Data size: 0 Basic stats: PARTIAL Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + PREHOOK: query: DROP TABLE inv PREHOOK: type: DROPTABLE PREHOOK: Input: default@inv