diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index 4bfc26f..819eef1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.MuxOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.UnionOperator; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -338,6 +339,29 @@ private int getMapJoinConversionPos(JoinOperator joinOp, OptimizeSparkProcContex return -1; } + // Union is hard to handle. For instance, the following case: + // TS TS + // | | + // FIL FIL + // | | + // SEL SEL + // \ / + // UNION + // | + // RS + // | + // JOIN + // If we treat this as a MJ case, then after the RS is removed, we would + // create two MapWorks, for each of the TS. Each of these MapWork will contain + // a MJ operator, which is wrong. + // Otherwise, we could try to break the op tree at the UNION, and create two MapWorks + // for the branches above. Then, MJ will be in the following ReduceWork. + // But, this is tricky to implement, and we'll leave it as a future work for now. + // TODO: handle this as a MJ case + if (containUnionWithoutRS(parentOp.getParentOperators().get(0))) { + return -1; + } + long inputSize = currInputStat.getDataSize(); if ((bigInputStat == null) || ((bigInputStat != null) && @@ -420,19 +444,6 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeSparkProc Operator parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition); if (parentBigTableOp instanceof ReduceSinkOperator) { - for (Operator p : parentBigTableOp.getParentOperators()) { - // we might have generated a dynamic partition operator chain. Since - // we're removing the reduce sink we need do remove that too. - Set> dynamicPartitionOperators = new HashSet>(); - for (Operator c : p.getChildOperators()) { - if (hasDynamicPartitionBroadcast(c)) { - dynamicPartitionOperators.add(c); - } - } - for (Operator c : dynamicPartitionOperators) { - p.removeChild(c); - } - } mapJoinOp.getParentOperators().remove(bigTablePosition); if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) { mapJoinOp.getParentOperators().add(bigTablePosition, @@ -450,31 +461,25 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeSparkProc return mapJoinOp; } - private boolean hasDynamicPartitionBroadcast(Operator parent) { - boolean hasDynamicPartitionPruning = false; - - for (Operator op: parent.getChildOperators()) { - while (op != null) { - if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) { - // found dynamic partition pruning operator - hasDynamicPartitionPruning = true; + private boolean containUnionWithoutRS(Operator op) { + boolean result = false; + if (op instanceof UnionOperator) { + for (Operator pop : op.getParentOperators()) { + if (!(pop instanceof ReduceSinkOperator)) { + result = true; break; } - - if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) { - // crossing reduce sink or file sink means the pruning isn't for this parent. - break; - } - - if (op.getChildOperators().size() != 1) { - // dynamic partition pruning pipeline doesn't have multiple children + } + } else if (op instanceof ReduceSinkOperator) { + result = false; + } else { + for (Operator pop : op.getParentOperators()) { + if (containUnionWithoutRS(pop)) { + result = true; break; } - - op = op.getChildOperators().get(0); } } - - return hasDynamicPartitionPruning; + return result; } }