diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index 167aefb..0a0fe74 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin; +import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx; import org.apache.hadoop.hive.ql.plan.ConditionalWork; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -114,6 +115,14 @@ public static void processSkewJoin(JoinOperator joinOp, return; } + List> children = currTask.getChildTasks(); + if (children != null && children.size() > 1) { + throw new SemanticException("Should not happened"); + } + + Task child = + children != null && children.size() == 1 ? children.get(0) : null; + String baseTmpDir = parseCtx.getContext().getMRTmpFileURI(); JoinDesc joinDescriptor = joinOp.getConf(); @@ -333,25 +342,27 @@ public static void processSkewJoin(JoinOperator joinOp, listWorks.add(skewJoinMapJoinTask.getWork()); listTasks.add(skewJoinMapJoinTask); } + if (children != null) { + for (Task tsk : listTasks) { + for (Task oldChild : children) { + tsk.addDependentTask(oldChild); + } + } + } + if (child != null) { + listTasks.add(child); + } + ConditionalResolverSkewJoinCtx context = + new ConditionalResolverSkewJoinCtx(bigKeysDirToTaskMap, child); ConditionalWork cndWork = new ConditionalWork(listWorks); ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf()); cndTsk.setListTasks(listTasks); cndTsk.setResolver(new ConditionalResolverSkewJoin()); - cndTsk - .setResolverCtx(new ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx( - bigKeysDirToTaskMap)); - List> oldChildTasks = currTask.getChildTasks(); + cndTsk.setResolverCtx(context); currTask.setChildTasks(new ArrayList>()); currTask.addDependentTask(cndTsk); - if (oldChildTasks != null) { - for (Task tsk : cndTsk.getListTasks()) { - for (Task oldChild : oldChildTasks) { - tsk.addDependentTask(oldChild); - } - } - } return; } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java index 680977c..184941f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverSkewJoin.java @@ -50,7 +50,8 @@ // tables into corresponding different dirs (one dir per table). // this map stores mapping from "big key dir" to its corresponding mapjoin // task. - HashMap> dirToTaskMap; + private HashMap> dirToTaskMap; + private Task noSkewTask; /** * For serialization use only. @@ -59,9 +60,11 @@ public ConditionalResolverSkewJoinCtx() { } public ConditionalResolverSkewJoinCtx( - HashMap> dirToTaskMap) { + HashMap> dirToTaskMap, + Task noSkewTask) { super(); this.dirToTaskMap = dirToTaskMap; + this.noSkewTask = noSkewTask; } public HashMap> getDirToTaskMap() { @@ -72,6 +75,14 @@ public void setDirToTaskMap( HashMap> dirToTaskMap) { this.dirToTaskMap = dirToTaskMap; } + + public Task getNoSkewTask() { + return noSkewTask; + } + + public void setNoSkewTask(Task noSkewTask) { + this.noSkewTask = noSkewTask; + } } public ConditionalResolverSkewJoin() { @@ -111,6 +122,9 @@ public ConditionalResolverSkewJoin() { } catch (IOException e) { e.printStackTrace(); } + if (resTsks.isEmpty() && ctx.getNoSkewTask() != null) { + resTsks.add(ctx.getNoSkewTask()); + } return resTsks; } diff --git ql/src/test/queries/clientpositive/skewjoin_noskew.q ql/src/test/queries/clientpositive/skewjoin_noskew.q new file mode 100644 index 0000000..b8ca592 --- /dev/null +++ ql/src/test/queries/clientpositive/skewjoin_noskew.q @@ -0,0 +1,9 @@ +set hive.auto.convert.join=false; +set hive.optimize.skewjoin=true; + +explain +create table noskew as select a.* from src a join src b on a.key=b.key order by a.key limit 30; + +create table noskew as select a.* from src a join src b on a.key=b.key order by a.key limit 30; + +select * from noskew; diff --git ql/src/test/results/clientpositive/skewjoin.q.out ql/src/test/results/clientpositive/skewjoin.q.out index 1c90927..2e03b7e 100644 --- ql/src/test/results/clientpositive/skewjoin.q.out +++ ql/src/test/results/clientpositive/skewjoin.q.out @@ -60,7 +60,7 @@ ABSTRACT SYNTAX TREE: STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-5 depends on stages: Stage-1 , consists of Stage-6 + Stage-5 depends on stages: Stage-1 , consists of Stage-6, Stage-0 Stage-6 Stage-4 depends on stages: Stage-6 Stage-0 depends on stages: Stage-1, Stage-4 @@ -701,7 +701,7 @@ ABSTRACT SYNTAX TREE: STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-5 depends on stages: Stage-1 , consists of Stage-6 + Stage-5 depends on stages: Stage-1 , consists of Stage-6, Stage-2 Stage-6 Stage-4 depends on stages: Stage-6 Stage-2 depends on stages: Stage-1, Stage-4 @@ -928,7 +928,7 @@ ABSTRACT SYNTAX TREE: STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-5 depends on stages: Stage-1 , consists of Stage-6 + Stage-5 depends on stages: Stage-1 , consists of Stage-6, Stage-2 Stage-6 Stage-4 depends on stages: Stage-6 Stage-2 depends on stages: Stage-1, Stage-4 @@ -1173,7 +1173,7 @@ ABSTRACT SYNTAX TREE: STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-7 depends on stages: Stage-1 , consists of Stage-8, Stage-9 + Stage-7 depends on stages: Stage-1 , consists of Stage-8, Stage-9, Stage-2 Stage-8 Stage-5 depends on stages: Stage-8 Stage-2 depends on stages: Stage-1, Stage-5, Stage-6 diff --git ql/src/test/results/clientpositive/skewjoin_noskew.q.out ql/src/test/results/clientpositive/skewjoin_noskew.q.out new file mode 100644 index 0000000..fe8da1e --- /dev/null +++ ql/src/test/results/clientpositive/skewjoin_noskew.q.out @@ -0,0 +1,225 @@ +PREHOOK: query: explain +create table noskew as select a.* from src a join src b on a.key=b.key order by a.key limit 30 +PREHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: query: explain +create table noskew as select a.* from src a join src b on a.key=b.key order by a.key limit 30 +POSTHOOK: type: CREATETABLE_AS_SELECT +ABSTRACT SYNTAX TREE: + (TOK_CREATETABLE (TOK_TABNAME noskew) TOK_LIKETABLE (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME src) a) (TOK_TABREF (TOK_TABNAME src) b) (= (. (TOK_TABLE_OR_COL a) key) (. (TOK_TABLE_OR_COL b) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_ALLCOLREF (TOK_TABNAME a)))) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (. (TOK_TABLE_OR_COL a) key))) (TOK_LIMIT 30)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-6 depends on stages: Stage-1 , consists of Stage-7, Stage-2 + Stage-7 + Stage-5 depends on stages: Stage-7 + Stage-2 depends on stages: Stage-1, Stage-5 + Stage-0 depends on stages: Stage-2 + Stage-8 depends on stages: Stage-0 + Stage-3 depends on stages: Stage-8 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + a + TableScan + alias: a + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 0 + value expressions: + expr: key + type: string + expr: value + type: string + b + TableScan + alias: b + Reduce Output Operator + key expressions: + expr: key + type: string + sort order: + + Map-reduce partition columns: + expr: key + type: string + tag: 1 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {VALUE._col0} {VALUE._col1} + 1 + handleSkewJoin: true + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-6 + Conditional Operator + + Stage: Stage-7 + Map Reduce Local Work + Alias -> Map Local Tables: + 1 + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + 1 + TableScan + HashTable Sink Operator + condition expressions: + 0 {0_VALUE_0} {0_VALUE_1} + 1 + handleSkewJoin: false + keys: + 0 [Column[joinkey0]] + 1 [Column[joinkey0]] + Position of Big Table: 0 + + Stage: Stage-5 + Map Reduce + Alias -> Map Operator Tree: + 0 + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {0_VALUE_0} {0_VALUE_1} + 1 + handleSkewJoin: false + keys: + 0 [Column[joinkey0]] + 1 [Column[joinkey0]] + outputColumnNames: _col0, _col1 + Position of Big Table: 0 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: string + Reduce Operator Tree: + Extract + Limit + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.noskew + + Stage: Stage-0 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-8 + Create Table Operator: + Create Table + columns: key string, value string + if not exists: false + input format: org.apache.hadoop.mapred.TextInputFormat + # buckets: -1 + output format: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat + name: noskew + isExternal: false + + Stage: Stage-3 + Stats-Aggr Operator + +PREHOOK: query: create table noskew as select a.* from src a join src b on a.key=b.key order by a.key limit 30 +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +POSTHOOK: query: create table noskew as select a.* from src a join src b on a.key=b.key order by a.key limit 30 +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: default@noskew +PREHOOK: query: select * from noskew +PREHOOK: type: QUERY +PREHOOK: Input: default@noskew +#### A masked pattern was here #### +POSTHOOK: query: select * from noskew +POSTHOOK: type: QUERY +POSTHOOK: Input: default@noskew +#### A masked pattern was here #### +0 val_0 +0 val_0 +0 val_0 +0 val_0 +0 val_0 +0 val_0 +0 val_0 +0 val_0 +0 val_0 +10 val_10 +100 val_100 +100 val_100 +100 val_100 +100 val_100 +103 val_103 +103 val_103 +103 val_103 +103 val_103 +104 val_104 +104 val_104 +104 val_104 +104 val_104 +105 val_105 +11 val_11 +111 val_111 +113 val_113 +113 val_113 +113 val_113 +113 val_113 +114 val_114