diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 105463f..b65913c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -296,9 +296,6 @@ public static void processSkewJoin(JoinOperator joinOp, Task>(); tableScanParents.add(tableScan); hashTableSinkOp.setParentOperators(tableScanParents); + hashTableSinkOp.setTag(tag); } private static void setMemUsage(MapJoinOperator mapJoinOp, Task task, @@ -412,8 +412,6 @@ private static void setMemUsage(MapJoinOperator mapJoinOp, Task 1 || !supportRuntimeSkewJoin(sparkWork, child)) { - return false; - } - } - return true; + return sparkWork.getChildren(work).isEmpty(); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java index 3445ae2..66de40e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java @@ -18,19 +18,10 @@ package org.apache.hadoop.hive.ql.optimizer.physical; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator; @@ -43,6 +34,9 @@ import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.ConditionalResolver; +import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin; +import org.apache.hadoop.hive.ql.plan.ConditionalWork; import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -52,6 +46,17 @@ import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; + public class SparkMapJoinResolver implements PhysicalPlanResolver { @Override @@ -244,7 +249,8 @@ private void generateLocalWork(SparkTask originalTask) { // all the parent SparkTasks that this new task is depend on, if they don't already exists. private SparkTask createSparkTask(SparkTask originalTask, SparkWork sparkWork, - Map createdTaskMap) { + Map createdTaskMap, + ConditionalTask conditionalTask) { if (createdTaskMap.containsKey(sparkWork)) { return createdTaskMap.get(sparkWork); } @@ -252,19 +258,27 @@ private SparkTask createSparkTask(SparkTask originalTask, originalTask : (SparkTask) TaskFactory.get(sparkWork, physicalContext.conf); if (!dependencyGraph.get(sparkWork).isEmpty()) { for (SparkWork parentWork : dependencyGraph.get(sparkWork)) { - SparkTask parentTask = createSparkTask(originalTask, parentWork, createdTaskMap); + SparkTask parentTask = + createSparkTask(originalTask, parentWork, createdTaskMap, conditionalTask); parentTask.addDependentTask(resultTask); } } else { if (originalTask != resultTask) { List> parentTasks = originalTask.getParentTasks(); if (parentTasks != null && parentTasks.size() > 0) { + // avoid concurrent modification + originalTask.setParentTasks(new ArrayList>()); for (Task parentTask : parentTasks) { parentTask.addDependentTask(resultTask); + parentTask.removeDependentTask(originalTask); } } else { - physicalContext.addToRootTask(resultTask); - physicalContext.removeFromRootTask(originalTask); + if (conditionalTask == null) { + physicalContext.addToRootTask(resultTask); + physicalContext.removeFromRootTask(originalTask); + } else { + updateConditionalTask(conditionalTask, originalTask, resultTask); + } } } } @@ -277,36 +291,91 @@ private SparkTask createSparkTask(SparkTask originalTask, public Object dispatch(Node nd, Stack stack, Object... nos) throws SemanticException { Task currentTask = (Task) nd; - if (currentTask instanceof SparkTask) { - SparkTask sparkTask = (SparkTask) currentTask; - SparkWork sparkWork = sparkTask.getWork(); + if(currentTask.isMapRedTask()) { + if (currentTask instanceof ConditionalTask) { + List> taskList = + ((ConditionalTask) currentTask).getListTasks(); + for (Task tsk : taskList) { + if (tsk instanceof SparkTask) { + processCurrentTask((SparkTask) tsk, (ConditionalTask) currentTask); + } + } + } else if (currentTask instanceof SparkTask) { + processCurrentTask((SparkTask) currentTask, null); + } + } - // Generate MapredLocalWorks for MJ and HTS - generateLocalWork(sparkTask); + return null; + } - dependencyGraph.put(sparkWork, new ArrayList()); - Set leaves = sparkWork.getLeaves(); - for (BaseWork leaf : leaves) { - moveWork(sparkWork, leaf, sparkWork); - } + private void processCurrentTask(SparkTask sparkTask, ConditionalTask conditionalTask) { + dependencyGraph.clear(); + sparkWorkMap.clear(); + SparkWork sparkWork = sparkTask.getWork(); - // Now remove all BaseWorks in all the childSparkWorks that we created - // from the original SparkWork - for (SparkWork newSparkWork : sparkWorkMap.values()) { - for (BaseWork work : newSparkWork.getAllWorkUnsorted()) { - sparkWork.remove(work); - } - } + // Generate MapredLocalWorks for MJ and HTS + generateLocalWork(sparkTask); - Map createdTaskMap = new LinkedHashMap(); + dependencyGraph.put(sparkWork, new ArrayList()); + Set leaves = sparkWork.getLeaves(); + for (BaseWork leaf : leaves) { + moveWork(sparkWork, leaf, sparkWork); + } - // Now create SparkTasks from the SparkWorks, also set up dependency - for (SparkWork work : dependencyGraph.keySet()) { - createSparkTask(sparkTask, work, createdTaskMap); + // Now remove all BaseWorks in all the childSparkWorks that we created + // from the original SparkWork + for (SparkWork newSparkWork : sparkWorkMap.values()) { + for (BaseWork work : newSparkWork.getAllWorkUnsorted()) { + sparkWork.remove(work); } } - return null; + Map createdTaskMap = new LinkedHashMap(); + + // Now create SparkTasks from the SparkWorks, also set up dependency + for (SparkWork work : dependencyGraph.keySet()) { + createSparkTask(sparkTask, work, createdTaskMap, conditionalTask); + } + } + + private void updateConditionalTask(ConditionalTask conditionalTask, + SparkTask originalTask, SparkTask newTask) { + ConditionalWork conditionalWork = conditionalTask.getWork(); + SparkWork originWork = originalTask.getWork(); + SparkWork newWork = newTask.getWork(); + List> listTask = conditionalTask.getListTasks(); + List listWork = (List) conditionalWork.getListWorks(); + int taskIndex = listTask.indexOf(originalTask); + int workIndex = listWork.indexOf(originWork); + if (taskIndex < 0 || workIndex < 0) { + return; + } + listTask.set(taskIndex, newTask); + listWork.set(workIndex, newWork); + ConditionalResolver resolver = conditionalTask.getResolver(); + if (resolver instanceof ConditionalResolverSkewJoin) { + // get bigKeysDirToTaskMap + ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx context = + (ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx) conditionalTask + .getResolverCtx(); + HashMap> bigKeysDirToTaskMap = context + .getDirToTaskMap(); + // to avoid concurrent modify the hashmap + HashMap> newbigKeysDirToTaskMap = + new HashMap>(); + // reset the resolver + for (Map.Entry> entry : + bigKeysDirToTaskMap.entrySet()) { + Task task = entry.getValue(); + Path bigKeyDir = entry.getKey(); + if (task.equals(originalTask)) { + newbigKeysDirToTaskMap.put(bigKeyDir, newTask); + } else { + newbigKeysDirToTaskMap.put(bigKeyDir, task); + } + } + context.setDirToTaskMap(newbigKeysDirToTaskMap); + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 24e1460..d798b32 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -17,17 +17,6 @@ */ package org.apache.hadoop.hive.ql.parse.spark; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Deque; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -80,6 +69,17 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Stack; /** * SparkCompiler translates the operator plan into SparkTasks. * @@ -272,6 +272,12 @@ protected void optimizeTaskPlan(List> rootTasks, Pa physicalCtx = new SplitSparkWorkResolver().resolve(physicalCtx); + if (conf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) { + (new SparkSkewJoinResolver()).resolve(physicalCtx); + } else { + LOG.debug("Skipping runtime skew join optimization"); + } + physicalCtx = new SparkMapJoinResolver().resolve(physicalCtx); if (conf.getBoolVar(HiveConf.ConfVars.HIVENULLSCANOPTIMIZE)) { @@ -303,13 +309,6 @@ protected void optimizeTaskPlan(List> rootTasks, Pa } else { LOG.debug("Skipping stage id rearranger"); } - - if (conf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) { - // TODO: enable after HIVE-8913 is done - //(new SparkSkewJoinResolver()).resolve(physicalCtx); - } else { - LOG.debug("Skipping runtime skew join optimization"); - } return; } } diff --git ql/src/test/results/clientpositive/spark/auto_join_stats.q.out ql/src/test/results/clientpositive/spark/auto_join_stats.q.out index 14c58fb..bccd246 100644 --- ql/src/test/results/clientpositive/spark/auto_join_stats.q.out +++ ql/src/test/results/clientpositive/spark/auto_join_stats.q.out @@ -182,8 +182,8 @@ POSTHOOK: query: explain select src1.key, src2.key, smalltable.key from src src1 POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-2 is a root stage - Stage-1 depends on stages: Stage-2, Stage-3 Stage-3 depends on stages: Stage-2 + Stage-1 depends on stages: Stage-3 Stage-0 depends on stages: Stage-1 STAGE PLANS: @@ -209,6 +209,28 @@ STAGE PLANS: Local Work: Map Reduce Local Work + Stage: Stage-3 + Spark +#### A masked pattern was here #### + Vertices: + Map 3 + Map Operator Tree: + TableScan + alias: src2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Spark HashTable Sink Operator + condition expressions: + 0 {key} + 1 + keys: + 0 key (type: string) + 1 key (type: string) + Local Work: + Map Reduce Local Work + Stage: Stage-1 Spark Edges: @@ -301,28 +323,6 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Stage: Stage-3 - Spark -#### A masked pattern was here #### - Vertices: - Map 3 - Map Operator Tree: - TableScan - alias: src2 - Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: key is not null (type: boolean) - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - Spark HashTable Sink Operator - condition expressions: - 0 {key} - 1 - keys: - 0 key (type: string) - 1 key (type: string) - Local Work: - Map Reduce Local Work - Stage: Stage-0 Fetch Operator limit: -1 diff --git ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out index 92c1fb7..e5473e9 100644 --- ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out +++ ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out @@ -314,8 +314,8 @@ join tab b on a.k1 = b.key POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-2 is a root stage - Stage-1 depends on stages: Stage-2, Stage-3 Stage-3 depends on stages: Stage-2 + Stage-1 depends on stages: Stage-3 Stage-0 depends on stages: Stage-1 STAGE PLANS: @@ -341,6 +341,28 @@ STAGE PLANS: Local Work: Map Reduce Local Work + Stage: Stage-3 + Spark +#### A masked pattern was here #### + Vertices: + Map 3 + Map Operator Tree: + TableScan + alias: tab + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Spark HashTable Sink Operator + condition expressions: + 0 + 1 {value} + keys: + 0 key (type: int) + 1 key (type: int) + Local Work: + Map Reduce Local Work + Stage: Stage-1 Spark Edges: @@ -421,28 +443,6 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Stage: Stage-3 - Spark -#### A masked pattern was here #### - Vertices: - Map 3 - Map Operator Tree: - TableScan - alias: tab - Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE - Filter Operator - predicate: key is not null (type: boolean) - Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE - Spark HashTable Sink Operator - condition expressions: - 0 - 1 {value} - keys: - 0 key (type: int) - 1 key (type: int) - Local Work: - Map Reduce Local Work - Stage: Stage-0 Fetch Operator limit: -1 diff --git ql/src/test/results/clientpositive/spark/skewjoin.q.out ql/src/test/results/clientpositive/spark/skewjoin.q.out index 56b78be..37b5ee5 100644 --- ql/src/test/results/clientpositive/spark/skewjoin.q.out +++ ql/src/test/results/clientpositive/spark/skewjoin.q.out @@ -80,7 +80,10 @@ INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 + Stage-4 depends on stages: Stage-1 , consists of Stage-5, Stage-0 + Stage-5 + Stage-3 depends on stages: Stage-5 + Stage-0 depends on stages: Stage-3 Stage-2 depends on stages: Stage-0 STAGE PLANS: @@ -125,6 +128,7 @@ STAGE PLANS: condition expressions: 0 {KEY.reducesinkkey0} 1 {VALUE._col0} + handleSkewJoin: true outputColumnNames: _col0, _col6 Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE Select Operator @@ -140,6 +144,56 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.dest_j1 + Stage: Stage-4 + Conditional Operator + + Stage: Stage-5 + Spark +#### A masked pattern was here #### + Vertices: + Map 5 + Map Operator Tree: + TableScan + Spark HashTable Sink Operator + condition expressions: + 0 {0_VALUE_0} + 1 {1_VALUE_0} + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + Local Work: + Map Reduce Local Work + + Stage: Stage-3 + Spark +#### A masked pattern was here #### + Vertices: + Map 4 + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {0_VALUE_0} + 1 {1_VALUE_0} + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + outputColumnNames: _col0, _col6 + Select Operator + expressions: UDFToInteger(_col0) (type: int), _col6 (type: string) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest_j1 + Local Work: + Map Reduce Local Work + Stage: Stage-0 Move Operator tables: