diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a906ce7ce..1f9a4688c8 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1656,7 +1656,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true, "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" + - "and follow-up operators in the query plan and merges them if they meet some preconditions."), + "and follow-up operators in the query plan and merges them if they meet some preconditions. Tez only."), + HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION("hive.combine.equivalent.work.optimization", true, "Whether to " + + "combine equivalent work objects during physical optimization.\n This optimization looks for equivalent " + + "work objects and combines them if they meet certain preconditions. Spark only."), HIVE_REMOVE_SQ_COUNT_CHECK("hive.optimize.remove.sq_count_check", false, "Whether to remove an extra join with sq_count_check for scalar subqueries " + "with constant group by keys."), diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index d472bb3f9e..59b9321568 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -1400,7 +1400,8 @@ spark.query.files=add_part_multiple.q, \ # Unlike "spark.query.files" above, these tests only run # under Spark engine and only use TestSparkCliDriver. -spark.only.query.files=spark_union_merge.q +spark.only.query.files=spark_union_merge.q,\ + spark_combine_equivalent_work_2.q # Unlike "miniSparkOnYarn.query.files" below, these tests only run # under Spark engine and only use TestMiniSparkOnYarnCliDriver. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 8144350bf9..965044d925 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -599,7 +599,11 @@ protected void optimizeTaskPlan(List> rootTasks, Pa LOG.debug("Skipping stage id rearranger"); } - new CombineEquivalentWorkResolver().resolve(physicalCtx); + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION)) { + new CombineEquivalentWorkResolver().resolve(physicalCtx); + } else { + LOG.debug("Skipping combine equivalent work optimization"); + } if (physicalCtx.getContext().getExplainAnalyze() != null) { new AnnotateRunTimeStatsOptimizer().resolve(physicalCtx); diff --git a/ql/src/test/queries/clientpositive/spark_combine_equivalent_work_2.q b/ql/src/test/queries/clientpositive/spark_combine_equivalent_work_2.q new file mode 100644 index 0000000000..2ff786a448 --- /dev/null +++ b/ql/src/test/queries/clientpositive/spark_combine_equivalent_work_2.q @@ -0,0 +1,41 @@ +-- SORT_QUERY_RESULTS + +explain +SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a; + +SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a; + +set hive.combine.equivalent.work.optimization = false; + +explain +SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a; + +SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a; diff --git a/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work_2.q.out b/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work_2.q.out new file mode 100644 index 0000000000..224e2d9ebe --- /dev/null +++ b/ql/src/test/results/clientpositive/spark/spark_combine_equivalent_work_2.q.out @@ -0,0 +1,232 @@ +PREHOOK: query: explain +SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a +PREHOOK: type: QUERY +POSTHOOK: query: explain +SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP, 1) + Reducer 4 <- Map 1 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 500 Data size: 2000 Basic stats: COMPLETE Column stats: COMPLETE + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + TopN Hash Memory Usage: 0.1 + Reducer 2 + Reduce Operator Tree: + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: 1 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Reduce Operator Tree: + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: 2 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1 +2 +PREHOOK: query: explain +SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a +PREHOOK: type: QUERY +POSTHOOK: query: explain +SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP, 1) + Reducer 4 <- Map 3 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 500 Data size: 2000 Basic stats: COMPLETE Column stats: COMPLETE + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + TopN Hash Memory Usage: 0.1 + Map 3 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 500 Data size: 2000 Basic stats: COMPLETE Column stats: COMPLETE + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + TopN Hash Memory Usage: 0.1 + Reducer 2 + Reduce Operator Tree: + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: 1 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Reduce Operator Tree: + Limit + Number of rows: 1 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: 2 (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: SELECT * +FROM ( + SELECT 1 AS id + FROM (SELECT * FROM src LIMIT 1) s1 + UNION ALL + SELECT 2 AS id + FROM (SELECT * FROM src LIMIT 1) s2 +) a +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +1 +2