diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java index bcd3825..62f6d72 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java @@ -117,7 +117,7 @@ private void removeSparkPartitionPruningSink(BaseWork sourceWork, MapWork target OperatorUtils.removeBranch(pruningSinkOp); // Remove all event source info from the target MapWork - String sourceWorkId = SparkUtilities.getWorkId(sourceWork); + String sourceWorkId = pruningSinkOp.getUniqueId(); SparkPartitionPruningSinkDesc pruningSinkDesc = pruningSinkOp.getConf(); targetMapWork.getEventSourceTableDescMap().get(sourceWorkId).remove(pruningSinkDesc.getTable()); targetMapWork.getEventSourceColumnNameMap().get(sourceWorkId).remove(pruningSinkDesc.getTargetColumnName()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java index 988579e..6e502eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java @@ -85,7 +85,7 @@ public int compare(BaseWork o1, BaseWork o2) { }; // maps from a work to the DPPs it contains - private Map> workToDpps = new HashMap<>(); + private Map> workToDpps = new HashMap<>(); @Override public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { @@ -215,16 +215,15 @@ private void combineEquivalentDPPSinks(SparkPartitionPruningSinkDesc first, if (workSet.size() > 1) { Iterator iterator = workSet.iterator(); BaseWork first = iterator.next(); - List dppList1 = workToDpps.get(first); - String firstId = SparkUtilities.getWorkId(first); + List dppList1 = workToDpps.get(first); while (iterator.hasNext()) { BaseWork next = iterator.next(); if (dppList1 != null) { - List dppList2 = workToDpps.get(next); + List dppList2 = workToDpps.get(next); // equivalent works must have dpp lists of same size for (int i = 0; i < dppList1.size(); i++) { - combineEquivalentDPPSinks(dppList1.get(i), dppList2.get(i), - firstId, SparkUtilities.getWorkId(next)); + combineEquivalentDPPSinks(dppList1.get(i).getConf(), dppList2.get(i).getConf(), + dppList1.get(i).getUniqueId(), dppList2.get(i).getUniqueId()); } } replaceWork(next, first, sparkWork); @@ -391,10 +390,11 @@ private boolean compareOperatorChain(Operator firstOperator, Operator seco } if (firstOperator instanceof SparkPartitionPruningSinkOperator) { - List dpps = workToDpps.computeIfAbsent(first, k -> new ArrayList<>()); - dpps.add(((SparkPartitionPruningSinkOperator) firstOperator).getConf()); + List dpps = workToDpps.computeIfAbsent( + first, k -> new ArrayList<>()); + dpps.add(((SparkPartitionPruningSinkOperator) firstOperator)); dpps = workToDpps.computeIfAbsent(second, k -> new ArrayList<>()); - dpps.add(((SparkPartitionPruningSinkOperator) secondOperator).getConf()); + dpps.add(((SparkPartitionPruningSinkOperator) secondOperator)); } return true; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 604c8ae..f4ab3b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -464,16 +464,12 @@ public void processPartitionPruningSink(GenSparkProcContext context, targetWork != null, "No targetWork found for tablescan " + ts); - String targetId = SparkUtilities.getWorkId(targetWork); - - BaseWork sourceWork = getEnclosingWork(pruningSink, context); - String sourceId = SparkUtilities.getWorkId(sourceWork); + String sourceId = pruningSink.getUniqueId(); // set up temporary path to communicate between the small/big table Path tmpPath = targetWork.getTmpPathForPartitionPruning(); if (tmpPath == null) { - Path baseTmpPath = context.parseContext.getContext().getMRTmpPath(); - tmpPath = SparkUtilities.generateTmpPathForPartitionPruning(baseTmpPath, targetId); + tmpPath = getDPPOutputPath(context.parseContext.getContext()); targetWork.setTmpPathForPartitionPruning(tmpPath); LOG.info("Setting tmp path between source work and target work:\n" + tmpPath); } @@ -509,6 +505,10 @@ public void processPartitionPruningSink(GenSparkProcContext context, keys.add(desc.getTargetPartKey()); } + private Path getDPPOutputPath(Context context) { + return new Path(context.getMRScratchDir(), "_dpp_output_"); + } + public static SparkEdgeProperty getEdgeProperty(HiveConf conf, ReduceSinkOperator reduceSink, ReduceWork reduceWork) throws SemanticException { boolean useSparkGroupBy = conf.getBoolVar(HiveConf.ConfVars.SPARK_USE_GROUPBY_SHUFFLE); @@ -682,19 +682,4 @@ private static boolean hasGBYOperator(ReduceSinkOperator rs) { } return false; } - - /** - * getEncosingWork finds the BaseWork any given operator belongs to. - */ - public BaseWork getEnclosingWork(Operator op, GenSparkProcContext procCtx) { - List> ops = new ArrayList>(); - OperatorUtils.findRoots(op, ops); - for (Operator r : ops) { - BaseWork work = procCtx.rootToWorkMap.get(r); - if (work != null) { - return work; - } - } - return null; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java index bd9de09..50dc75a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java @@ -21,6 +21,7 @@ import java.io.BufferedOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -55,6 +56,9 @@ protected transient Serializer serializer; protected transient DataOutputBuffer buffer; protected static final Logger LOG = LoggerFactory.getLogger(SparkPartitionPruningSinkOperator.class); + private static final AtomicLong sequenceNum = new AtomicLong(0); + + private transient String uniqueId = null; /** Kryo ctor. */ @VisibleForTesting @@ -202,4 +206,11 @@ public static String getOperatorName() { return "SPARKPRUNINGSINK"; } + public String getUniqueId() { + if (uniqueId == null) { + uniqueId = getOperatorId() + "_" + sequenceNum.getAndIncrement(); + } + return uniqueId; + } + }