diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 61d376a..1f975c8 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -803,6 +803,7 @@ spark.query.files=add_part_multiple.q, \ rcfile_bigdata.q, \ reduce_deduplicate_exclude_join.q, \ router_join_ppr.q, \ + runtime_skewjoin_mapjoin_spark.q, \ sample1.q, \ sample10.q, \ sample2.q, \ diff --git ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java index 9e4612d..f22694b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java +++ ql/src/java/org/apache/hadoop/hive/ql/lib/PreOrderWalker.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.lib; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.parse.SemanticException; /** @@ -58,6 +60,12 @@ public void walk(Node nd) throws SemanticException { for (Node n : nd.getChildren()) { walk(n); } + } else if (nd instanceof ConditionalTask) { + for (Task n : ((ConditionalTask) nd).getListTasks()) { + if (n.getParentTasks() == null || n.getParentTasks().isEmpty()) { + walk(n); + } + } } opStack.pop(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 52e6be8..f3fb541 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -335,6 +335,7 @@ public static void processSkewJoin(JoinOperator joinOp, Task currTask, HiveConf hiveConf) { - List> children = currTask.getChildTasks(); - return GenMRSkewJoinProcessor.skewJoinEnabled(hiveConf, joinOp) - && (children == null || children.size() <= 1); - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java index 69004dc..8e56263 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java @@ -59,6 +59,9 @@ public class SparkMapJoinResolver implements PhysicalPlanResolver { + // prevents a task from being processed multiple times + private final Set> visitedTasks = new HashSet<>(); + @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { @@ -78,6 +81,15 @@ private boolean containsOp(BaseWork work, Class clazz) { return matchingOps != null && !matchingOps.isEmpty(); } + private boolean containsOp(SparkWork sparkWork, Class clazz) { + for (BaseWork work : sparkWork.getAllWorkUnsorted()) { + if (containsOp(work, clazz)) { + return true; + } + } + return false; + } + public static Set> getOp(BaseWork work, Class clazz) { Set> ops = new HashSet>(); if (work instanceof MapWork) { @@ -172,70 +184,68 @@ private void moveWork(SparkWork sparkWork, BaseWork work, SparkWork targetWork) private void generateLocalWork(SparkTask originalTask) { SparkWork originalWork = originalTask.getWork(); - Collection allBaseWorks = originalWork.getAllWorkUnsorted(); - - for (BaseWork work : allBaseWorks) { - if (containsOp(work, SparkHashTableSinkOperator.class) || - containsOp(work, MapJoinOperator.class)) { - work.setMapRedLocalWork(new MapredLocalWork()); - } - } - + Collection allBaseWorks = originalWork.getAllWork(); Context ctx = physicalContext.getContext(); for (BaseWork work : allBaseWorks) { - Set> ops = getOp(work, MapJoinOperator.class); - if (ops == null || ops.isEmpty()) { - continue; - } - Path tmpPath = Utilities.generateTmpPath(ctx.getMRTmpPath(), originalTask.getId()); - MapredLocalWork bigTableLocalWork = work.getMapRedLocalWork(); - List> dummyOps = - new ArrayList>(work.getDummyOps()); - bigTableLocalWork.setDummyParentOp(dummyOps); - bigTableLocalWork.setTmpPath(tmpPath); - - // In one work, only one map join operator can be bucketed - SparkBucketMapJoinContext bucketMJCxt = null; - for (Operator op: ops) { - MapJoinOperator mapJoinOp = (MapJoinOperator) op; - MapJoinDesc mapJoinDesc = mapJoinOp.getConf(); - if (mapJoinDesc.isBucketMapJoin()) { - bucketMJCxt = new SparkBucketMapJoinContext(mapJoinDesc); - bucketMJCxt.setBucketMatcherClass( - org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class); - bucketMJCxt.setPosToAliasMap(mapJoinOp.getPosToAliasMap()); - ((MapWork) work).setUseBucketizedHiveInputFormat(true); - bigTableLocalWork.setBucketMapjoinContext(bucketMJCxt); - bigTableLocalWork.setInputFileChangeSensitive(true); - break; + if (work.getMapRedLocalWork() == null) { + if (containsOp(work, SparkHashTableSinkOperator.class) || + containsOp(work, MapJoinOperator.class)) { + work.setMapRedLocalWork(new MapredLocalWork()); } - } - - for (BaseWork parentWork : originalWork.getParents(work)) { - Set> hashTableSinkOps = - getOp(parentWork, SparkHashTableSinkOperator.class); - if (hashTableSinkOps == null || hashTableSinkOps.isEmpty()) { + Set> ops = getOp(work, MapJoinOperator.class); + if (ops == null || ops.isEmpty()) { continue; } - MapredLocalWork parentLocalWork = parentWork.getMapRedLocalWork(); - parentLocalWork.setTmpHDFSPath(tmpPath); - if (bucketMJCxt != null) { - // We only need to update the work with the hashtable - // sink operator with the same mapjoin desc. We can tell - // that by comparing the bucket file name mapping map - // instance. They should be exactly the same one due to - // the way how the bucket mapjoin context is constructed. - for (Operator op: hashTableSinkOps) { - SparkHashTableSinkOperator hashTableSinkOp = (SparkHashTableSinkOperator) op; - SparkHashTableSinkDesc hashTableSinkDesc = hashTableSinkOp.getConf(); - BucketMapJoinContext original = hashTableSinkDesc.getBucketMapjoinContext(); - if (original != null && original.getBucketFileNameMapping() - == bucketMJCxt.getBucketFileNameMapping()) { - ((MapWork) parentWork).setUseBucketizedHiveInputFormat(true); - parentLocalWork.setBucketMapjoinContext(bucketMJCxt); - parentLocalWork.setInputFileChangeSensitive(true); - break; + Path tmpPath = Utilities.generateTmpPath(ctx.getMRTmpPath(), originalTask.getId()); + MapredLocalWork bigTableLocalWork = work.getMapRedLocalWork(); + List> dummyOps = + new ArrayList>(work.getDummyOps()); + bigTableLocalWork.setDummyParentOp(dummyOps); + bigTableLocalWork.setTmpPath(tmpPath); + + // In one work, only one map join operator can be bucketed + SparkBucketMapJoinContext bucketMJCxt = null; + for (Operator op : ops) { + MapJoinOperator mapJoinOp = (MapJoinOperator) op; + MapJoinDesc mapJoinDesc = mapJoinOp.getConf(); + if (mapJoinDesc.isBucketMapJoin()) { + bucketMJCxt = new SparkBucketMapJoinContext(mapJoinDesc); + bucketMJCxt.setBucketMatcherClass( + org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class); + bucketMJCxt.setPosToAliasMap(mapJoinOp.getPosToAliasMap()); + ((MapWork) work).setUseBucketizedHiveInputFormat(true); + bigTableLocalWork.setBucketMapjoinContext(bucketMJCxt); + bigTableLocalWork.setInputFileChangeSensitive(true); + break; + } + } + + for (BaseWork parentWork : originalWork.getParents(work)) { + Set> hashTableSinkOps = + getOp(parentWork, SparkHashTableSinkOperator.class); + if (hashTableSinkOps == null || hashTableSinkOps.isEmpty()) { + continue; + } + MapredLocalWork parentLocalWork = parentWork.getMapRedLocalWork(); + parentLocalWork.setTmpHDFSPath(tmpPath); + if (bucketMJCxt != null) { + // We only need to update the work with the hashtable + // sink operator with the same mapjoin desc. We can tell + // that by comparing the bucket file name mapping map + // instance. They should be exactly the same one due to + // the way how the bucket mapjoin context is constructed. + for (Operator op : hashTableSinkOps) { + SparkHashTableSinkOperator hashTableSinkOp = (SparkHashTableSinkOperator) op; + SparkHashTableSinkDesc hashTableSinkDesc = hashTableSinkOp.getConf(); + BucketMapJoinContext original = hashTableSinkDesc.getBucketMapjoinContext(); + if (original != null && original.getBucketFileNameMapping() + == bucketMJCxt.getBucketFileNameMapping()) { + ((MapWork) parentWork).setUseBucketizedHiveInputFormat(true); + parentLocalWork.setBucketMapjoinContext(bucketMJCxt); + parentLocalWork.setInputFileChangeSensitive(true); + break; + } } } } @@ -296,10 +306,12 @@ public Object dispatch(Node nd, Stack stack, Object... nos) for (Task tsk : taskList) { if (tsk instanceof SparkTask) { processCurrentTask((SparkTask) tsk, (ConditionalTask) currentTask); + visitedTasks.add(tsk); } } } else if (currentTask instanceof SparkTask) { processCurrentTask((SparkTask) currentTask, null); + visitedTasks.add(currentTask); } } @@ -312,32 +324,47 @@ public Object dispatch(Node nd, Stack stack, Object... nos) * wrapped in its task list. */ private void processCurrentTask(SparkTask sparkTask, ConditionalTask conditionalTask) { - dependencyGraph.clear(); - sparkWorkMap.clear(); SparkWork sparkWork = sparkTask.getWork(); + if (!visitedTasks.contains(sparkTask)) { + dependencyGraph.clear(); + sparkWorkMap.clear(); - // Generate MapredLocalWorks for MJ and HTS - generateLocalWork(sparkTask); + // Generate MapredLocalWorks for MJ and HTS + generateLocalWork(sparkTask); - dependencyGraph.put(sparkWork, new ArrayList()); - Set leaves = sparkWork.getLeaves(); - for (BaseWork leaf : leaves) { - moveWork(sparkWork, leaf, sparkWork); - } + dependencyGraph.put(sparkWork, new ArrayList()); + Set leaves = sparkWork.getLeaves(); + for (BaseWork leaf : leaves) { + moveWork(sparkWork, leaf, sparkWork); + } - // Now remove all BaseWorks in all the childSparkWorks that we created - // from the original SparkWork - for (SparkWork newSparkWork : sparkWorkMap.values()) { - for (BaseWork work : newSparkWork.getAllWorkUnsorted()) { - sparkWork.remove(work); + // Now remove all BaseWorks in all the childSparkWorks that we created + // from the original SparkWork + for (SparkWork newSparkWork : sparkWorkMap.values()) { + for (BaseWork work : newSparkWork.getAllWorkUnsorted()) { + sparkWork.remove(work); + } } - } - Map createdTaskMap = new LinkedHashMap(); + Map createdTaskMap = new LinkedHashMap(); - // Now create SparkTasks from the SparkWorks, also set up dependency - for (SparkWork work : dependencyGraph.keySet()) { - createSparkTask(sparkTask, work, createdTaskMap, conditionalTask); + // Now create SparkTasks from the SparkWorks, also set up dependency + for (SparkWork work : dependencyGraph.keySet()) { + createSparkTask(sparkTask, work, createdTaskMap, conditionalTask); + } + } else if (conditionalTask != null) { + // We may need to update the conditional task's list. This happens when a common map join + // task exists in the task list and has already been processed. In such a case, + // the current task is the map join task and we need to replace it with + // its parent, i.e. the small table task. + if (sparkTask.getParentTasks() != null && sparkTask.getParentTasks().size() == 1 && + sparkTask.getParentTasks().get(0) instanceof SparkTask) { + SparkTask parent = (SparkTask) sparkTask.getParentTasks().get(0); + if (containsOp(sparkWork, MapJoinOperator.class) && + containsOp(parent.getWork(), SparkHashTableSinkOperator.class)) { + updateConditionalTask(conditionalTask, sparkTask, parent); + } + } } } @@ -382,6 +409,10 @@ private void updateConditionalTask(ConditionalTask conditionalTask, } } context.setDirToTaskMap(newbigKeysDirToTaskMap); + // update no skew task + if (context.getNoSkewTask() != null && context.getNoSkewTask().equals(originalTask)) { + context.setNoSkewTask(newTask); + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java index 4c2f42e..5990d17 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.CommonJoinOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -34,6 +35,7 @@ import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; +import org.apache.hadoop.hive.ql.optimizer.physical.GenMRSkewJoinProcessor; import org.apache.hadoop.hive.ql.optimizer.physical.GenSparkSkewJoinProcessor; import org.apache.hadoop.hive.ql.optimizer.physical.SkewJoinProcFactory; import org.apache.hadoop.hive.ql.optimizer.physical.SparkMapJoinResolver; @@ -50,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import java.io.Serializable; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.Stack; @@ -58,6 +61,9 @@ * Spark-version of SkewJoinProcFactory. */ public class SparkSkewJoinProcFactory { + // let's remember the join operators we have processed + private static final Set visitedJoinOp = new HashSet(); + private SparkSkewJoinProcFactory() { // prevent instantiation } @@ -81,13 +87,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, JoinOperator op = (JoinOperator) nd; ReduceWork reduceWork = context.getReducerToReduceWork().get(op); ParseContext parseContext = context.getParseCtx(); - if (!op.getConf().isFixedAsSorted() && currentTsk instanceof SparkTask - && reduceWork != null && ((SparkTask) currentTsk).getWork().contains(reduceWork) - && GenSparkSkewJoinProcessor.supportRuntimeSkewJoin( - op, currentTsk, parseContext.getConf())) { + if (reduceWork != null && !visitedJoinOp.contains(op) && + supportRuntimeSkewJoin(op, reduceWork, currentTsk, parseContext.getConf())) { // first we try to split the task splitTask((SparkTask) currentTsk, reduceWork, parseContext); GenSparkSkewJoinProcessor.processSkewJoin(op, currentTsk, reduceWork, parseContext); + visitedJoinOp.add(op); } return null; } @@ -112,8 +117,7 @@ private static void splitTask(SparkTask currentTask, ReduceWork reduceWork, SparkWork newWork = new SparkWork(parseContext.getConf().getVar(HiveConf.ConfVars.HIVEQUERYID)); newWork.add(childWork); - copyWorkGraph(currentWork, newWork, childWork, true); - copyWorkGraph(currentWork, newWork, childWork, false); + copyWorkGraph(currentWork, newWork, childWork); // remove them from current spark work for (BaseWork baseWork : newWork.getAllWorkUnsorted()) { currentWork.remove(baseWork); @@ -196,22 +200,39 @@ private static boolean canSplit(SparkWork sparkWork) { /** * Copy a sub-graph from originWork to newWork. */ - private static void copyWorkGraph(SparkWork originWork, SparkWork newWork, - BaseWork baseWork, boolean upWards) { - if (upWards) { - for (BaseWork parent : originWork.getParents(baseWork)) { - newWork.add(parent); - SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(parent, baseWork); - newWork.connect(parent, baseWork, edgeProperty); - copyWorkGraph(originWork, newWork, parent, true); - } - } else { - for (BaseWork child : originWork.getChildren(baseWork)) { + private static void copyWorkGraph(SparkWork originWork, SparkWork newWork, BaseWork baseWork) { + for (BaseWork child : originWork.getChildren(baseWork)) { + if (!newWork.contains(child)) { newWork.add(child); SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(baseWork, child); newWork.connect(baseWork, child, edgeProperty); - copyWorkGraph(originWork, newWork, child, false); + copyWorkGraph(originWork, newWork, child); } } + for (BaseWork parent : originWork.getParents(baseWork)) { + if (!newWork.contains(parent)) { + newWork.add(parent); + SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(parent, baseWork); + newWork.connect(parent, baseWork, edgeProperty); + copyWorkGraph(originWork, newWork, parent); + } + } + } + + public static Set getVisitedJoinOp() { + return visitedJoinOp; + } + + private static boolean supportRuntimeSkewJoin(JoinOperator joinOp, ReduceWork reduceWork, + Task currTask, HiveConf hiveConf) { + if (currTask instanceof SparkTask && + GenMRSkewJoinProcessor.skewJoinEnabled(hiveConf, joinOp)) { + SparkWork sparkWork = ((SparkTask) currTask).getWork(); + List> children = currTask.getChildTasks(); + return !joinOp.getConf().isFixedAsSorted() && sparkWork.contains(reduceWork) && + (children == null || children.size() <= 1) && + SparkMapJoinResolver.getOp(reduceWork, CommonJoinOperator.class).size() == 1; + } + return false; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java index 984380d..608a0de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.lib.GraphWalker; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.PreOrderWalker; import org.apache.hadoop.hive.ql.lib.Rule; import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; @@ -53,8 +54,10 @@ public class SparkSkewJoinResolver implements PhysicalPlanResolver { @Override public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + SparkSkewJoinProcFactory.getVisitedJoinOp().clear(); Dispatcher disp = new SparkSkewJoinTaskDispatcher(pctx); - GraphWalker ogw = new DefaultGraphWalker(disp); + // since we may split current task, use a pre-order walker + GraphWalker ogw = new PreOrderWalker(disp); ArrayList topNodes = new ArrayList(); topNodes.addAll(pctx.getRootTasks()); ogw.startWalking(topNodes, null); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java index a7c896d..8500b21 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java @@ -236,6 +236,7 @@ public void remove(BaseWork work) { List parents = getParents(work); for (BaseWork w: children) { + edgeProperties.remove(new ImmutablePair(work, w)); invertedWorkGraph.get(w).remove(work); if (invertedWorkGraph.get(w).size() == 0) { roots.add(w); @@ -243,6 +244,7 @@ public void remove(BaseWork work) { } for (BaseWork w: parents) { + edgeProperties.remove(new ImmutablePair(w, work)); workGraph.get(w).remove(work); if (workGraph.get(w).size() == 0) { leaves.add(w); diff --git ql/src/test/queries/clientpositive/runtime_skewjoin_mapjoin_spark.q ql/src/test/queries/clientpositive/runtime_skewjoin_mapjoin_spark.q new file mode 100644 index 0000000..1888005 --- /dev/null +++ ql/src/test/queries/clientpositive/runtime_skewjoin_mapjoin_spark.q @@ -0,0 +1,24 @@ +set hive.optimize.skewjoin = true; +set hive.skewjoin.key = 4; +set hive.auto.convert.join=true; +set hive.auto.convert.join.noconditionaltask=true; +set hive.auto.convert.join.noconditionaltask.size=50; + +-- This is mainly intended for spark, to test runtime skew join together with map join + +CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE; + +LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1; + +EXPLAIN +SELECT COUNT(*) FROM + (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a +JOIN + (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b +ON a.key=b.key; + +SELECT COUNT(*) FROM + (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a +JOIN + (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b +ON a.key=b.key; diff --git ql/src/test/results/clientpositive/spark/runtime_skewjoin_mapjoin_spark.q.out ql/src/test/results/clientpositive/spark/runtime_skewjoin_mapjoin_spark.q.out new file mode 100644 index 0000000..2348058 --- /dev/null +++ ql/src/test/results/clientpositive/spark/runtime_skewjoin_mapjoin_spark.q.out @@ -0,0 +1,314 @@ +PREHOOK: query: -- This is mainly intended for spark, to test runtime skew join together with map join + +CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@T1 +POSTHOOK: query: -- This is mainly intended for spark, to test runtime skew join together with map join + +CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@T1 +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1 +PREHOOK: type: LOAD +#### A masked pattern was here #### +PREHOOK: Output: default@t1 +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/T1.txt' INTO TABLE T1 +POSTHOOK: type: LOAD +#### A masked pattern was here #### +POSTHOOK: Output: default@t1 +PREHOOK: query: EXPLAIN +SELECT COUNT(*) FROM + (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a +JOIN + (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b +ON a.key=b.key +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT COUNT(*) FROM + (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a +JOIN + (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b +ON a.key=b.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-7 depends on stages: Stage-1 , consists of Stage-10, Stage-9 + Stage-10 + Stage-6 depends on stages: Stage-10 + Stage-9 depends on stages: Stage-6 + Stage-5 depends on stages: Stage-9 + Stage-4 depends on stages: Stage-5 , consists of Stage-8, Stage-2 + Stage-8 + Stage-3 depends on stages: Stage-8 + Stage-2 depends on stages: Stage-3 + Stage-0 depends on stages: Stage-2 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 5 (PARTITION-LEVEL SORT, 2) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Map 5 + Map Operator Tree: + TableScan + alias: src2 + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: key (type: string) + sort order: + + Map-reduce partition columns: key (type: string) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + handleSkewJoin: true + keys: + 0 key (type: string) + 1 key (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-7 + Conditional Operator + + Stage: Stage-10 + Spark +#### A masked pattern was here #### + Vertices: + Map 13 + Map Operator Tree: + TableScan + Spark HashTable Sink Operator + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + Local Work: + Map Reduce Local Work + + Stage: Stage-6 + Spark +#### A masked pattern was here #### + Vertices: + Map 12 + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + outputColumnNames: _col0 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-9 + Spark +#### A masked pattern was here #### + Vertices: + Map 7 + Map Operator Tree: + TableScan + alias: t1 + Statistics: Num rows: 0 Data size: 30 Basic stats: PARTIAL Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE + Spark HashTable Sink Operator + keys: + 0 key (type: string) + 1 key (type: string) + Local Work: + Map Reduce Local Work + + Stage: Stage-5 + Spark + Edges: + Reducer 3 <- Map 11 (PARTITION-LEVEL SORT, 2), Map 6 (PARTITION-LEVEL SORT, 2) +#### A masked pattern was here #### + Vertices: + Map 11 + Map Operator Tree: + TableScan + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Map 6 + Map Operator Tree: + TableScan + alias: src + Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 key (type: string) + 1 key (type: string) + outputColumnNames: _col0 + input vertices: + 1 Map 7 + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + Local Work: + Map Reduce Local Work + Reducer 3 + Reduce Operator Tree: + Join Operator + condition map: + Inner Join 0 to 1 + handleSkewJoin: true + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + Statistics: Num rows: 302 Data size: 3213 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-4 + Conditional Operator + + Stage: Stage-8 + Spark +#### A masked pattern was here #### + Vertices: + Map 10 + Map Operator Tree: + TableScan + Spark HashTable Sink Operator + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + Local Work: + Map Reduce Local Work + + Stage: Stage-3 + Spark +#### A masked pattern was here #### + Vertices: + Map 9 + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-2 + Spark + Edges: + Reducer 4 <- Map 8 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 8 + Map Operator Tree: + TableScan + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 4 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT COUNT(*) FROM + (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a +JOIN + (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b +ON a.key=b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Input: default@t1 +#### A masked pattern was here #### +POSTHOOK: query: SELECT COUNT(*) FROM + (SELECT src1.key,src1.value FROM src src1 JOIN src src2 ON src1.key=src2.key) a +JOIN + (SELECT src.key,src.value FROM src JOIN T1 ON src.key=T1.key) b +ON a.key=b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Input: default@t1 +#### A masked pattern was here #### +3