diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index c78ea54..596e487 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -983,7 +983,7 @@ public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSc * @param parseCtx * @return The TableScanOperator inserted before child. */ - protected static TableScanOperator createTemporaryFile( + public static TableScanOperator createTemporaryFile( Operator parent, Operator child, Path taskTmpDir, TableDesc tt_desc, ParseContext parseCtx) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index b65913c..9f54916 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -82,22 +82,13 @@ private GenSparkSkewJoinProcessor() { public static void processSkewJoin(JoinOperator joinOp, Task currTask, ReduceWork reduceWork, ParseContext parseCtx) throws SemanticException { - // We are trying to adding map joins to handle skew keys, and map join right - // now does not work with outer joins - if (!GenMRSkewJoinProcessor.skewJoinEnabled(parseCtx.getConf(), joinOp) || - !(currTask instanceof SparkTask)) { - return; - } SparkWork currentWork = ((SparkTask) currTask).getWork(); - if (!supportRuntimeSkewJoin(currentWork, reduceWork)) { + if (currentWork.getChildren(reduceWork).size() > 0) { + LOG.warn("Skip runtime skew join as the ReduceWork has child work and hasn't been split."); return; } List> children = currTask.getChildTasks(); - if (children != null && children.size() > 1) { - LOG.warn("Skip runtime skew join as current task has multiple children."); - return; - } Task child = children != null && children.size() == 1 ? children.get(0) : null; @@ -424,17 +415,11 @@ private static void setMemUsage(MapJoinOperator mapJoinOp, Task currTask, HiveConf hiveConf) { + List> children = currTask.getChildTasks(); + return GenMRSkewJoinProcessor.skewJoinEnabled(hiveConf, joinOp) && + (children == null || children.size() <= 1); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java index 4a8e9b9..18d1545 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java @@ -78,7 +78,7 @@ private boolean containsOp(BaseWork work, Class clazz) { return matchingOps != null && !matchingOps.isEmpty(); } - private Set> getOp(BaseWork work, Class clazz) { + public static Set> getOp(BaseWork work, Class clazz) { Set> ops = new HashSet>(); if (work instanceof MapWork) { Collection> opSet = ((MapWork) work).getAliasToWork().values(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java index 6a9c308..77a447e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java @@ -18,18 +18,41 @@ package org.apache.hadoop.hive.ql.optimizer.spark; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; import org.apache.hadoop.hive.ql.optimizer.physical.GenSparkSkewJoinProcessor; import org.apache.hadoop.hive.ql.optimizer.physical.SkewJoinProcFactory; +import org.apache.hadoop.hive.ql.optimizer.physical.SparkMapJoinResolver; import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.QBJoinTree; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; import java.io.Serializable; +import java.util.List; +import java.util.Set; import java.util.Stack; /** @@ -55,17 +78,136 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { SparkSkewJoinResolver.SparkSkewJoinProcCtx context = (SparkSkewJoinResolver.SparkSkewJoinProcCtx) procCtx; + Task currentTsk = context.getCurrentTask(); JoinOperator op = (JoinOperator) nd; - if (op.getConf().isFixedAsSorted()) { - return null; - } + ReduceWork reduceWork = context.getReducerToReduceWork().get(op); ParseContext parseContext = context.getParseCtx(); - Task currentTsk = context.getCurrentTask(); - if (currentTsk instanceof SparkTask) { - GenSparkSkewJoinProcessor.processSkewJoin(op, currentTsk, - context.getReducerToReduceWork().get(op), parseContext); + if (!op.getConf().isFixedAsSorted() && currentTsk instanceof SparkTask && + reduceWork != null && ((SparkTask) currentTsk).getWork().contains(reduceWork) && + GenSparkSkewJoinProcessor.supportRuntimeSkewJoin( + op, currentTsk, parseContext.getConf())) { + // first we try to split the task + splitTask((SparkTask) currentTsk, reduceWork, parseContext); + GenSparkSkewJoinProcessor.processSkewJoin(op, currentTsk, reduceWork, parseContext); } return null; } } + + /** + * If the join is not in a leaf ReduceWork, the spark task has to be split into 2 tasks. + */ + private static void splitTask(SparkTask currentTask, ReduceWork reduceWork, + ParseContext parseContext) throws SemanticException { + SparkWork currentWork = currentTask.getWork(); + Set> reduceSinkSet = + SparkMapJoinResolver.getOp(reduceWork, ReduceSinkOperator.class); + if (currentWork.getChildren(reduceWork).size() == 1 && canSplit(currentWork) && + reduceSinkSet.size() == 1) { + ReduceSinkOperator reduceSink = (ReduceSinkOperator) reduceSinkSet.iterator().next(); + BaseWork childWork = currentWork.getChildren(reduceWork).get(0); + SparkEdgeProperty originEdge = currentWork.getEdgeProperty(reduceWork, childWork); + // disconnect the reduce work from its child. this should produce two isolated sub graphs + currentWork.disconnect(reduceWork, childWork); + // move works following the current reduce work into a new spark work + SparkWork newWork = + new SparkWork(parseContext.getConf().getVar(HiveConf.ConfVars.HIVEQUERYID)); + newWork.add(childWork); + copyWorkGraph(currentWork, newWork, childWork, true); + copyWorkGraph(currentWork, newWork, childWork, false); + // remove them from current spark work + for (BaseWork baseWork : newWork.getAllWorkUnsorted()) { + currentWork.remove(baseWork); + // TODO: take care of cloneToWork? + currentWork.getCloneToWork().remove(baseWork); + } + // create TS to read intermediate data + Context baseCtx = parseContext.getContext(); + Path taskTmpDir = baseCtx.getMRTmpPath(); + Operator rsParent = reduceSink.getParentOperators().get(0); + TableDesc tableDesc = PlanUtils.getIntermediateFileTableDesc(PlanUtils + .getFieldSchemasFromRowSchema(rsParent.getSchema(), "temporarycol")); + // this will insert FS and TS between the RS and its parent + TableScanOperator tableScanOp = GenMapRedUtils.createTemporaryFile( + rsParent, reduceSink, taskTmpDir, tableDesc, parseContext); + // create new MapWork + MapWork mapWork = PlanUtils.getMapRedWork().getMapWork(); + mapWork.setName("Map " + GenSparkUtils.getUtils().getNextSeqNumber()); + newWork.add(mapWork); + newWork.connect(mapWork, childWork, originEdge); + // setup the new map work + String streamDesc = taskTmpDir.toUri().toString(); + if (GenMapRedUtils.needsTagging((ReduceWork) childWork)) { + Operator childReducer = ((ReduceWork) childWork).getReducer(); + QBJoinTree joinTree = null; + if (childReducer instanceof JoinOperator) { + joinTree = parseContext.getJoinContext().get(childReducer); + } else if (childReducer instanceof MapJoinOperator) { + joinTree = parseContext.getMapJoinContext().get(childReducer); + } else if (childReducer instanceof SMBMapJoinOperator) { + joinTree = parseContext.getSmbMapJoinContext().get(childReducer); + } + if (joinTree != null && joinTree.getId() != null) { + streamDesc = joinTree.getId() + ":$INTNAME"; + } else { + streamDesc = "$INTNAME"; + } + // TODO: remove this? + String origStreamDesc = streamDesc; + int pos = 0; + while (mapWork.getAliasToWork().get(streamDesc) != null) { + streamDesc = origStreamDesc.concat(String.valueOf(++pos)); + } + } + GenMapRedUtils.setTaskPlan(taskTmpDir.toUri().toString(), streamDesc, + tableScanOp, mapWork, false, tableDesc); + // insert the new task between current task and its child + Task newTask = TaskFactory.get(newWork, parseContext.getConf()); + List> childTasks = currentTask.getChildTasks(); + // must have at most one child + if (childTasks != null && childTasks.size() > 0) { + Task childTask = childTasks.get(0); + currentTask.removeDependentTask(childTask); + newTask.addDependentTask(childTask); + } + currentTask.addDependentTask(newTask); + newTask.setFetchSource(currentTask.isFetchSource()); + } + } + + /** + * Whether we can split at reduceWork. For simplicity, let's require each work can + * have at most one child work. This may be relaxed by checking connectivity of the + * work graph after disconnect the current reduce work from its child + */ + private static boolean canSplit(SparkWork sparkWork) { + for (BaseWork baseWork : sparkWork.getAllWorkUnsorted()) { + if (sparkWork.getChildren(baseWork).size() > 1) { + return false; + } + } + return true; + } + + /** + * Copy a sub-graph from originWork to newWork + */ + private static void copyWorkGraph(SparkWork originWork, SparkWork newWork, + BaseWork baseWork, boolean upWards) { + if (upWards) { + for (BaseWork parent : originWork.getParents(baseWork)) { + newWork.add(parent); + SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(parent, baseWork); + newWork.connect(parent, baseWork, edgeProperty); + copyWorkGraph(originWork, newWork, parent, true); + } + } else { + for (BaseWork child : originWork.getChildren(baseWork)) { + newWork.add(child); + SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(baseWork, child); + newWork.connect(baseWork, child, edgeProperty); + copyWorkGraph(originWork, newWork, child, false); + } + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java index 9b180f1..354c78c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java @@ -41,8 +41,10 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Stack; @@ -75,9 +77,6 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) Task task = (Task) nd; if (task instanceof SparkTask) { SparkWork sparkWork = ((SparkTask) task).getWork(); - if (sparkWork.getAllReduceWork().isEmpty()) { - return null; - } SparkSkewJoinProcCtx skewJoinProcCtx = new SparkSkewJoinProcCtx(task, physicalContext.getParseContext()); Map opRules = new LinkedHashMap(); @@ -87,7 +86,10 @@ public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) SparkSkewJoinProcFactory.getDefaultProc(), opRules, skewJoinProcCtx); GraphWalker ogw = new DefaultGraphWalker(disp); ArrayList topNodes = new ArrayList(); - for (ReduceWork reduceWork : sparkWork.getAllReduceWork()) { + // since we may need to split the task, let's walk the graph bottom-up + List reduceWorkList = sparkWork.getAllReduceWork(); + Collections.reverse(reduceWorkList); + for (ReduceWork reduceWork : reduceWorkList) { topNodes.add(reduceWork.getReducer()); skewJoinProcCtx.getReducerToReduceWork().put(reduceWork.getReducer(), reduceWork); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java index 275d567..99539c2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java @@ -175,6 +175,7 @@ public void disconnect(BaseWork a, BaseWork b) { if (getChildren(a).isEmpty()) { leaves.add(a); } + edgeProperties.remove(new ImmutablePair(a, b)); } /** @@ -397,7 +398,7 @@ public String toString() { return result; } - // get all reduce works in this spark work + // get all reduce works in this spark work in sorted order public List getAllReduceWork() { List result = new ArrayList(); for (BaseWork work : getAllWork()) { diff --git ql/src/test/results/clientpositive/spark/skewjoin.q.out ql/src/test/results/clientpositive/spark/skewjoin.q.out index de12b46..0e53d12 100644 --- ql/src/test/results/clientpositive/spark/skewjoin.q.out +++ ql/src/test/results/clientpositive/spark/skewjoin.q.out @@ -607,14 +607,17 @@ SELECT sum(hash(Y.key)), sum(hash(Y.value)) POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 + Stage-4 depends on stages: Stage-1 , consists of Stage-5, Stage-2 + Stage-5 + Stage-3 depends on stages: Stage-5 + Stage-2 depends on stages: Stage-3 + Stage-0 depends on stages: Stage-2 STAGE PLANS: Stage: Stage-1 Spark Edges: Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 3), Map 4 (PARTITION-LEVEL SORT, 3) - Reducer 3 <- Reducer 2 (GROUP, 1) #### A masked pattern was here #### Vertices: Map 1 @@ -660,6 +663,7 @@ STAGE PLANS: condition expressions: 0 1 {KEY.reducesinkkey0} {VALUE._col0} + handleSkewJoin: true outputColumnNames: _col2, _col3 Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE Group By Operator @@ -667,6 +671,72 @@ STAGE PLANS: mode: hash outputColumnNames: _col0, _col1 Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-4 + Conditional Operator + + Stage: Stage-5 + Spark +#### A masked pattern was here #### + Vertices: + Map 7 + Map Operator Tree: + TableScan + Spark HashTable Sink Operator + condition expressions: + 0 + 1 {1_VALUE_0} {1_VALUE_1} + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + Local Work: + Map Reduce Local Work + + Stage: Stage-3 + Spark +#### A masked pattern was here #### + Vertices: + Map 6 + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 {1_VALUE_0} {1_VALUE_1} + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + outputColumnNames: _col2, _col3 + Group By Operator + aggregations: sum(hash(_col2)), sum(hash(_col3)) + mode: hash + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-2 + Spark + Edges: + Reducer 3 <- Map 5 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 5 + Map Operator Tree: + TableScan Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE @@ -731,14 +801,17 @@ SELECT sum(hash(Y.key)), sum(hash(Y.value)) POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 + Stage-4 depends on stages: Stage-1 , consists of Stage-5, Stage-2 + Stage-5 + Stage-3 depends on stages: Stage-5 + Stage-2 depends on stages: Stage-3 + Stage-0 depends on stages: Stage-2 STAGE PLANS: Stage: Stage-1 Spark Edges: Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 3), Map 4 (PARTITION-LEVEL SORT, 3) - Reducer 3 <- Reducer 2 (GROUP, 1) #### A masked pattern was here #### Vertices: Map 1 @@ -784,6 +857,7 @@ STAGE PLANS: condition expressions: 0 1 {KEY.reducesinkkey0} {VALUE._col0} + handleSkewJoin: true outputColumnNames: _col2, _col3 Statistics: Num rows: 137 Data size: 1460 Basic stats: COMPLETE Column stats: NONE Group By Operator @@ -791,6 +865,72 @@ STAGE PLANS: mode: hash outputColumnNames: _col0, _col1 Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-4 + Conditional Operator + + Stage: Stage-5 + Spark +#### A masked pattern was here #### + Vertices: + Map 7 + Map Operator Tree: + TableScan + Spark HashTable Sink Operator + condition expressions: + 0 + 1 {1_VALUE_0} {1_VALUE_1} + keys: + 0 reducesinkkey0 (type: string), reducesinkkey1 (type: double) + 1 reducesinkkey0 (type: string), reducesinkkey1 (type: double) + Local Work: + Map Reduce Local Work + + Stage: Stage-3 + Spark +#### A masked pattern was here #### + Vertices: + Map 6 + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 + 1 {1_VALUE_0} {1_VALUE_1} + keys: + 0 reducesinkkey0 (type: string), reducesinkkey1 (type: double) + 1 reducesinkkey0 (type: string), reducesinkkey1 (type: double) + outputColumnNames: _col2, _col3 + Group By Operator + aggregations: sum(hash(_col2)), sum(hash(_col3)) + mode: hash + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-2 + Spark + Edges: + Reducer 3 <- Map 5 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 5 + Map Operator Tree: + TableScan Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE @@ -863,14 +1003,19 @@ ON src1.c1 = src3.c5 AND src3.c5 < 80 POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 + Stage-5 depends on stages: Stage-1 , consists of Stage-6, Stage-7, Stage-2 + Stage-6 + Stage-3 depends on stages: Stage-6 + Stage-2 depends on stages: Stage-3, Stage-4 + Stage-7 + Stage-4 depends on stages: Stage-7 + Stage-0 depends on stages: Stage-2 STAGE PLANS: Stage: Stage-1 Spark Edges: Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 3), Map 4 (PARTITION-LEVEL SORT, 3), Map 5 (PARTITION-LEVEL SORT, 3) - Reducer 3 <- Reducer 2 (GROUP, 1) #### A masked pattern was here #### Vertices: Map 1 @@ -935,6 +1080,7 @@ STAGE PLANS: 0 {KEY.reducesinkkey0} 1 {VALUE._col0} 2 + handleSkewJoin: true outputColumnNames: _col0, _col3 Statistics: Num rows: 121 Data size: 1284 Basic stats: COMPLETE Column stats: NONE Group By Operator @@ -942,6 +1088,91 @@ STAGE PLANS: mode: hash outputColumnNames: _col0, _col1 Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-5 + Conditional Operator + + Stage: Stage-6 + Spark +#### A masked pattern was here #### + Vertices: + Map 8 + Map Operator Tree: + TableScan + Spark HashTable Sink Operator + condition expressions: + 0 {0_VALUE_0} + 1 {1_VALUE_0} + 2 + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + 2 reducesinkkey0 (type: string) + Local Work: + Map Reduce Local Work + Map 9 + Map Operator Tree: + TableScan + Spark HashTable Sink Operator + condition expressions: + 0 {0_VALUE_0} + 1 {1_VALUE_0} + 2 + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + 2 reducesinkkey0 (type: string) + Local Work: + Map Reduce Local Work + + Stage: Stage-3 + Spark +#### A masked pattern was here #### + Vertices: + Map 7 + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + Inner Join 0 to 2 + condition expressions: + 0 {0_VALUE_0} + 1 {1_VALUE_0} + 2 + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + 2 reducesinkkey0 (type: string) + outputColumnNames: _col0, _col3 + Group By Operator + aggregations: sum(hash(_col0)), sum(hash(_col3)) + mode: hash + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-2 + Spark + Edges: + Reducer 3 <- Map 6 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 6 + Map Operator Tree: + TableScan Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE @@ -965,6 +1196,72 @@ STAGE PLANS: output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Stage: Stage-7 + Spark +#### A masked pattern was here #### + Vertices: + Map 10 + Map Operator Tree: + TableScan + Spark HashTable Sink Operator + condition expressions: + 0 {0_VALUE_0} + 1 {1_VALUE_0} + 2 + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + 2 reducesinkkey0 (type: string) + Local Work: + Map Reduce Local Work + Map 12 + Map Operator Tree: + TableScan + Spark HashTable Sink Operator + condition expressions: + 0 {0_VALUE_0} + 1 {1_VALUE_0} + 2 + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + 2 reducesinkkey0 (type: string) + Local Work: + Map Reduce Local Work + + Stage: Stage-4 + Spark +#### A masked pattern was here #### + Vertices: + Map 11 + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + Inner Join 0 to 2 + condition expressions: + 0 {0_VALUE_0} + 1 {1_VALUE_0} + 2 + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + 2 reducesinkkey0 (type: string) + outputColumnNames: _col0, _col3 + Group By Operator + aggregations: sum(hash(_col0)), sum(hash(_col3)) + mode: hash + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + Stage: Stage-0 Fetch Operator limit: -1 diff --git ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out index a6ac032..1ad078a 100644 --- ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out +++ ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out @@ -6,16 +6,19 @@ create table noskew as select a.* from src a join src b on a.key=b.key order by POSTHOOK: type: CREATETABLE_AS_SELECT STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 - Stage-3 depends on stages: Stage-0 - Stage-2 depends on stages: Stage-3 + Stage-5 depends on stages: Stage-1 , consists of Stage-6, Stage-3 + Stage-6 + Stage-4 depends on stages: Stage-6 + Stage-3 depends on stages: Stage-4 + Stage-0 depends on stages: Stage-3 + Stage-7 depends on stages: Stage-0 + Stage-2 depends on stages: Stage-7 STAGE PLANS: Stage: Stage-1 Spark Edges: Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 1), Map 4 (PARTITION-LEVEL SORT, 1) - Reducer 3 <- Reducer 2 (SORT, 1) #### A masked pattern was here #### Vertices: Map 1 @@ -53,13 +56,76 @@ STAGE PLANS: condition expressions: 0 {KEY.reducesinkkey0} {VALUE._col0} 1 + handleSkewJoin: true outputColumnNames: _col0, _col1 Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string) - sort order: + - Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE - value expressions: _col1 (type: string) + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + + Stage: Stage-5 + Conditional Operator + + Stage: Stage-6 + Spark +#### A masked pattern was here #### + Vertices: + Map 7 + Map Operator Tree: + TableScan + Spark HashTable Sink Operator + condition expressions: + 0 {0_VALUE_0} {0_VALUE_1} + 1 + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + Local Work: + Map Reduce Local Work + + Stage: Stage-4 + Spark +#### A masked pattern was here #### + Vertices: + Map 6 + Map Operator Tree: + TableScan + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {0_VALUE_0} {0_VALUE_1} + 1 + keys: + 0 reducesinkkey0 (type: string) + 1 reducesinkkey0 (type: string) + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe + Local Work: + Map Reduce Local Work + + Stage: Stage-3 + Spark + Edges: + Reducer 3 <- Map 5 (SORT, 1) +#### A masked pattern was here #### + Vertices: + Map 5 + Map Operator Tree: + TableScan + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: string) Reducer 3 Reduce Operator Tree: Select Operator @@ -84,7 +150,7 @@ STAGE PLANS: hdfs directory: true #### A masked pattern was here #### - Stage: Stage-3 + Stage: Stage-7 Create Table Operator: Create Table columns: key string, value string