From ba6c978f657dc8f03a1e6ce4d4793f91aa78b9b4 Mon Sep 17 00:00:00 2001 From: Gopal V Date: Wed, 18 Jan 2017 01:50:11 -0800 Subject: [PATCH] HIVE-15655: Optimizer: Allow config option to disable n-way JOIN merging (Gopal V) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../test/resources/testconfiguration.properties | 1 + .../hadoop/hive/ql/parse/CalcitePlanner.java | 4 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 5 +- ql/src/test/queries/clientpositive/tez_nway_join.q | 22 ++ .../clientpositive/llap/tez_nway_join.q.out | 287 +++++++++++++++++++++ 6 files changed, 318 insertions(+), 3 deletions(-) create mode 100644 ql/src/test/queries/clientpositive/tez_nway_join.q create mode 100644 ql/src/test/results/clientpositive/llap/tez_nway_join.q.out diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c88ca87..dfcbd4a 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3209,6 +3209,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Maximum total data size in dynamic pruning."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), + HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, + "Merge adjacent joins into a single n-way join"), HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null), "If value is greater than 0 logs in fixed intervals of size n rather than exponentially."), HIVE_MSCK_PATH_VALIDATION("hive.msck.path.validation", "throw", diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index be5a747..7c1caf8 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -577,6 +577,7 @@ minillaplocal.query.files=acid_globallimit.q,\ tez_join_tests.q,\ tez_joins_explain.q,\ tez_multi_union.q,\ + tez_nway_join.q,\ tez_schema_evolution.q,\ tez_self_join.q,\ tez_smb_1.q,\ diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 9f1b9d5..6f10f8a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -392,7 +392,7 @@ Operator genOPTree(ASTNode ast, PlannerContext plannerCtx) throws SemanticExcept // PartitionList is not evaluated until the run phase. getMetaData(getQB()); - disableJoinMerge = false; + disableJoinMerge = defaultJoinMerge; sinkOp = genPlan(getQB()); LOG.info("CBO Succeeded; optimized logical plan."); this.ctx.setCboInfo("Plan optimized by CBO."); @@ -437,7 +437,7 @@ Operator genOPTree(ASTNode ast, PlannerContext plannerCtx) throws SemanticExcept } } finally { runCBO = false; - disableJoinMerge = false; + disableJoinMerge = defaultJoinMerge; disableSemJoinReordering = false; if (reAnalyzeAST) { init(true); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f275f6a..cf6b4dd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -315,6 +315,7 @@ protected boolean partialscan; protected volatile boolean disableJoinMerge = false; + protected final boolean defaultJoinMerge; /* * Capture the CTE definitions in a Query. @@ -391,6 +392,8 @@ public SemanticAnalyzer(QueryState queryState) throws SemanticException { mergeIsDirect = true; noscan = partialscan = false; tabNameToTabObject = new HashMap<>(); + defaultJoinMerge = false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MERGE_NWAY_JOINS); + disableJoinMerge = defaultJoinMerge; } @Override @@ -420,7 +423,7 @@ protected void reset(boolean clearPartsCache) { smbMapJoinContext.clear(); opParseCtx.clear(); groupOpToInputTables.clear(); - disableJoinMerge = false; + disableJoinMerge = defaultJoinMerge; aliasToCTEs.clear(); topToTable.clear(); opToPartPruner.clear(); diff --git ql/src/test/queries/clientpositive/tez_nway_join.q ql/src/test/queries/clientpositive/tez_nway_join.q new file mode 100644 index 0000000..2c5ca16 --- /dev/null +++ ql/src/test/queries/clientpositive/tez_nway_join.q @@ -0,0 +1,22 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.auto.convert.join=true; +-- SORT_QUERY_RESULTS + +create temporary table foo (key int) stored as orc; +create temporary table bar (key int) stored as orc; + +insert into foo values(1),(2),(3); +insert into bar values(2),(4); + +set hive.merge.nway.joins=true; +explain select count(*) from foo a join bar b on (a.key = b.key) join bar c on (a.key = c.key); + +set hive.merge.nway.joins=false; +explain select count(*) from foo a join bar b on (a.key = b.key) join bar c on (a.key = c.key); + +set hive.merge.nway.joins=true; +select count(*) from foo a join bar b on (a.key = b.key) join bar c on (a.key = c.key); + +set hive.merge.nway.joins=false; +select count(*) from foo a join bar b on (a.key = b.key) join bar c on (a.key = c.key); diff --git ql/src/test/results/clientpositive/llap/tez_nway_join.q.out ql/src/test/results/clientpositive/llap/tez_nway_join.q.out new file mode 100644 index 0000000..367a788 --- /dev/null +++ ql/src/test/results/clientpositive/llap/tez_nway_join.q.out @@ -0,0 +1,287 @@ +PREHOOK: query: -- SORT_QUERY_RESULTS + +create temporary table foo (key int) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@foo +POSTHOOK: query: -- SORT_QUERY_RESULTS + +create temporary table foo (key int) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@foo +PREHOOK: query: create temporary table bar (key int) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@bar +POSTHOOK: query: create temporary table bar (key int) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@bar +PREHOOK: query: insert into foo values(1),(2),(3) +PREHOOK: type: QUERY +PREHOOK: Output: default@foo +POSTHOOK: query: insert into foo values(1),(2),(3) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@foo +POSTHOOK: Lineage: foo.key EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: insert into bar values(2),(4) +PREHOOK: type: QUERY +PREHOOK: Output: default@bar +POSTHOOK: query: insert into bar values(2),(4) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@bar +POSTHOOK: Lineage: bar.key EXPRESSION [(values__tmp__table__2)values__tmp__table__2.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: explain select count(*) from foo a join bar b on (a.key = b.key) join bar c on (a.key = c.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain select count(*) from foo a join bar b on (a.key = b.key) join bar c on (a.key = c.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE), Map 4 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 46 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 46 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 46 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + Inner Join 0 to 2 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + 2 _col0 (type: int) + input vertices: + 1 Map 3 + 2 Map 4 + Statistics: Num rows: 101 Data size: 404 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: llap + LLAP IO: all inputs + Map 3 + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Execution mode: llap + LLAP IO: all inputs + Map 4 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Execution mode: llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: explain select count(*) from foo a join bar b on (a.key = b.key) join bar c on (a.key = c.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain select count(*) from foo a join bar b on (a.key = b.key) join bar c on (a.key = c.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE), Map 4 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + Statistics: Num rows: 46 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 46 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 46 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col0 + input vertices: + 1 Map 3 + Statistics: Num rows: 50 Data size: 202 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + input vertices: + 1 Map 4 + Statistics: Num rows: 55 Data size: 222 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Execution mode: llap + LLAP IO: all inputs + Map 3 + Map Operator Tree: + TableScan + alias: b + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Execution mode: llap + LLAP IO: all inputs + Map 4 + Map Operator Tree: + TableScan + alias: c + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 45 Data size: 183 Basic stats: COMPLETE Column stats: NONE + Execution mode: llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from foo a join bar b on (a.key = b.key) join bar c on (a.key = c.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@bar +PREHOOK: Input: default@foo +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from foo a join bar b on (a.key = b.key) join bar c on (a.key = c.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bar +POSTHOOK: Input: default@foo +#### A masked pattern was here #### +1 +PREHOOK: query: select count(*) from foo a join bar b on (a.key = b.key) join bar c on (a.key = c.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@bar +PREHOOK: Input: default@foo +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from foo a join bar b on (a.key = b.key) join bar c on (a.key = c.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bar +POSTHOOK: Input: default@foo +#### A masked pattern was here #### +1 -- 2.4.0