diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java index bcd3825..62f6d72 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java @@ -117,7 +117,7 @@ private void removeSparkPartitionPruningSink(BaseWork sourceWork, MapWork target OperatorUtils.removeBranch(pruningSinkOp); // Remove all event source info from the target MapWork - String sourceWorkId = SparkUtilities.getWorkId(sourceWork); + String sourceWorkId = pruningSinkOp.getUniqueId(); SparkPartitionPruningSinkDesc pruningSinkDesc = pruningSinkOp.getConf(); targetMapWork.getEventSourceTableDescMap().get(sourceWorkId).remove(pruningSinkDesc.getTable()); targetMapWork.getEventSourceColumnNameMap().get(sourceWorkId).remove(pruningSinkDesc.getTargetColumnName()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 6500682..c9dc261 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -41,6 +41,8 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.calcite.util.Pair; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; @@ -4635,6 +4637,10 @@ private static VectorPTFInfo createVectorPTFInfo(Operator> workToDpps = new HashMap<>(); + private Map> workToDpps = new HashMap<>(); @Override public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) throws SemanticException { @@ -215,16 +215,15 @@ private void combineEquivalentDPPSinks(SparkPartitionPruningSinkDesc first, if (workSet.size() > 1) { Iterator iterator = workSet.iterator(); BaseWork first = iterator.next(); - List dppList1 = workToDpps.get(first); - String firstId = SparkUtilities.getWorkId(first); + List dppList1 = workToDpps.get(first); while (iterator.hasNext()) { BaseWork next = iterator.next(); if (dppList1 != null) { - List dppList2 = workToDpps.get(next); + List dppList2 = workToDpps.get(next); // equivalent works must have dpp lists of same size for (int i = 0; i < dppList1.size(); i++) { - combineEquivalentDPPSinks(dppList1.get(i), dppList2.get(i), - firstId, SparkUtilities.getWorkId(next)); + combineEquivalentDPPSinks(dppList1.get(i).getConf(), dppList2.get(i).getConf(), + dppList1.get(i).getUniqueId(), dppList2.get(i).getUniqueId()); } } replaceWork(next, first, sparkWork); @@ -391,10 +390,11 @@ private boolean compareOperatorChain(Operator firstOperator, Operator seco } if (firstOperator instanceof SparkPartitionPruningSinkOperator) { - List dpps = workToDpps.computeIfAbsent(first, k -> new ArrayList<>()); - dpps.add(((SparkPartitionPruningSinkOperator) firstOperator).getConf()); + List dpps = workToDpps.computeIfAbsent( + first, k -> new ArrayList<>()); + dpps.add(((SparkPartitionPruningSinkOperator) firstOperator)); dpps = workToDpps.computeIfAbsent(second, k -> new ArrayList<>()); - dpps.add(((SparkPartitionPruningSinkOperator) secondOperator).getConf()); + dpps.add(((SparkPartitionPruningSinkOperator) secondOperator)); } return true; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index 604c8ae..f4ab3b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@ -464,16 +464,12 @@ public void processPartitionPruningSink(GenSparkProcContext context, targetWork != null, "No targetWork found for tablescan " + ts); - String targetId = SparkUtilities.getWorkId(targetWork); - - BaseWork sourceWork = getEnclosingWork(pruningSink, context); - String sourceId = SparkUtilities.getWorkId(sourceWork); + String sourceId = pruningSink.getUniqueId(); // set up temporary path to communicate between the small/big table Path tmpPath = targetWork.getTmpPathForPartitionPruning(); if (tmpPath == null) { - Path baseTmpPath = context.parseContext.getContext().getMRTmpPath(); - tmpPath = SparkUtilities.generateTmpPathForPartitionPruning(baseTmpPath, targetId); + tmpPath = getDPPOutputPath(context.parseContext.getContext()); targetWork.setTmpPathForPartitionPruning(tmpPath); LOG.info("Setting tmp path between source work and target work:\n" + tmpPath); } @@ -509,6 +505,10 @@ public void processPartitionPruningSink(GenSparkProcContext context, keys.add(desc.getTargetPartKey()); } + private Path getDPPOutputPath(Context context) { + return new Path(context.getMRScratchDir(), "_dpp_output_"); + } + public static SparkEdgeProperty getEdgeProperty(HiveConf conf, ReduceSinkOperator reduceSink, ReduceWork reduceWork) throws SemanticException { boolean useSparkGroupBy = conf.getBoolVar(HiveConf.ConfVars.SPARK_USE_GROUPBY_SHUFFLE); @@ -682,19 +682,4 @@ private static boolean hasGBYOperator(ReduceSinkOperator rs) { } return false; } - - /** - * getEncosingWork finds the BaseWork any given operator belongs to. - */ - public BaseWork getEnclosingWork(Operator op, GenSparkProcContext procCtx) { - List> ops = new ArrayList>(); - OperatorUtils.findRoots(op, ops); - for (Operator r : ops) { - BaseWork work = procCtx.rootToWorkMap.get(r); - if (work != null) { - return work; - } - } - return null; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java index bd9de09..034655f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java @@ -21,6 +21,7 @@ import java.io.BufferedOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -55,6 +56,9 @@ protected transient Serializer serializer; protected transient DataOutputBuffer buffer; protected static final Logger LOG = LoggerFactory.getLogger(SparkPartitionPruningSinkOperator.class); + private static final AtomicLong sequenceNum = new AtomicLong(0); + + private transient String uniqueId = null; /** Kryo ctor. */ @VisibleForTesting @@ -202,4 +206,14 @@ public static String getOperatorName() { return "SPARKPRUNINGSINK"; } + public synchronized String getUniqueId() { + if (uniqueId == null) { + uniqueId = getOperatorId() + "_" + sequenceNum.getAndIncrement(); + } + return uniqueId; + } + + public synchronized void setUniqueId(String uniqueId) { + this.uniqueId = uniqueId; + } } diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_4.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_4.q index 240128f..e5f4874 100644 --- a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_4.q +++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_4.q @@ -153,5 +153,24 @@ select * from union all (select part2.key, part2.value from part2 join top on part2.p=top.key and part2.q=top.value); +-- The following test case makes sure target map works can read from multiple DPP sinks, +-- when the DPP sinks have different target lists +-- see HIVE-18111 + +create table foo(key string); +insert into table foo values ('1'),('2'); + +set hive.cbo.enable = false; + +explain +select p from part2 where p in (select max(key) from foo) +union all +select p from part1 where p in (select max(key) from foo union all select min(key) from foo); + +select p from part2 where p in (select max(key) from foo) +union all +select p from part1 where p in (select max(key) from foo union all select min(key) from foo); + +drop table foo; drop table part1; -drop table part2; \ No newline at end of file +drop table part2; diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_4.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_4.q.out index 20fa5a7..a06c3e3 100644 --- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_4.q.out +++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_4.q.out @@ -1873,6 +1873,335 @@ STAGE PLANS: Processor Tree: ListSink +PREHOOK: query: create table foo(key string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@foo +POSTHOOK: query: create table foo(key string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@foo +PREHOOK: query: insert into table foo values ('1'),('2') +PREHOOK: type: QUERY +PREHOOK: Output: default@foo +POSTHOOK: query: insert into table foo values ('1'),('2') +POSTHOOK: type: QUERY +POSTHOOK: Output: default@foo +POSTHOOK: Lineage: foo.key SIMPLE [(values__tmp__table__9)values__tmp__table__9.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +PREHOOK: query: explain +select p from part2 where p in (select max(key) from foo) +union all +select p from part1 where p in (select max(key) from foo union all select min(key) from foo) +PREHOOK: type: QUERY +POSTHOOK: query: explain +select p from part2 where p in (select max(key) from foo) +union all +select p from part1 where p in (select max(key) from foo union all select min(key) from foo) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-1 depends on stages: Stage-2 + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-2 + Spark + Edges: + Reducer 12 <- Map 11 (GROUP, 1) + Reducer 16 <- Map 15 (GROUP, 1) +#### A masked pattern was here #### + Vertices: + Map 11 + Map Operator Tree: + TableScan + alias: foo + Statistics: Num rows: 2 Data size: 2 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 2 Data size: 2 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: max(key) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) + Map 15 + Map Operator Tree: + TableScan + alias: foo + Statistics: Num rows: 2 Data size: 2 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 2 Data size: 2 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(key) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) + Reducer 12 + Reduce Operator Tree: + Group By Operator + aggregations: max(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: _col0 is not null (type: boolean) + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + Target column: [1:p (string), 5:p (string)] + partition key expr: [p, p] + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + target works: [Map 1, Map 5] + Reducer 16 + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: _col0 is not null (type: boolean) + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col0 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE + Spark Partition Pruning Sink Operator + Target column: [5:p (string)] + partition key expr: [p] + Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE + target works: [Map 5] + + Stage: Stage-1 + Spark + Edges: + Reducer 10 <- Map 9 (GROUP, 1) + Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Reducer 4 (PARTITION-LEVEL SORT, 4) + Reducer 4 <- Map 3 (GROUP, 1) + Reducer 6 <- Map 5 (PARTITION-LEVEL SORT, 4), Reducer 10 (PARTITION-LEVEL SORT, 4), Reducer 4 (PARTITION-LEVEL SORT, 4) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: part2 + Statistics: Num rows: 8 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: p (type: string) + sort order: + + Map-reduce partition columns: p (type: string) + Statistics: Num rows: 8 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Map 3 + Map Operator Tree: + TableScan + alias: foo + Statistics: Num rows: 2 Data size: 2 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 2 Data size: 2 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: max(key) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) + Map 5 + Map Operator Tree: + TableScan + alias: part1 + Statistics: Num rows: 8 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: p (type: string) + sort order: + + Map-reduce partition columns: p (type: string) + Statistics: Num rows: 8 Data size: 24 Basic stats: COMPLETE Column stats: NONE + Map 9 + Map Operator Tree: + TableScan + alias: foo + Statistics: Num rows: 2 Data size: 2 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string) + outputColumnNames: key + Statistics: Num rows: 2 Data size: 2 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: min(key) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) + Reducer 10 + Reduce Operator Tree: + Group By Operator + aggregations: min(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: _col0 is not null (type: boolean) + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Join Operator + condition map: + Left Semi Join 0 to 1 + outputColumnNames: _col2 + Statistics: Num rows: 8 Data size: 26 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col2 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 8 Data size: 26 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 16 Data size: 52 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 4 + Reduce Operator Tree: + Group By Operator + aggregations: max(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: _col0 is not null (type: boolean) + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: _col0 (type: string) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 1 Data size: 184 Basic stats: COMPLETE Column stats: NONE + Reducer 6 + Reduce Operator Tree: + Join Operator + condition map: + Left Semi Join 0 to 1 + outputColumnNames: _col2 + Statistics: Num rows: 8 Data size: 26 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col2 (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 8 Data size: 26 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 16 Data size: 52 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select p from part2 where p in (select max(key) from foo) +union all +select p from part1 where p in (select max(key) from foo union all select min(key) from foo) +PREHOOK: type: QUERY +PREHOOK: Input: default@foo +PREHOOK: Input: default@part1 +PREHOOK: Input: default@part1@p=1/q=1 +PREHOOK: Input: default@part1@p=1/q=2 +PREHOOK: Input: default@part1@p=2/q=1 +PREHOOK: Input: default@part1@p=2/q=2 +PREHOOK: Input: default@part2 +PREHOOK: Input: default@part2@p=3/q=3 +PREHOOK: Input: default@part2@p=3/q=4 +PREHOOK: Input: default@part2@p=4/q=3 +PREHOOK: Input: default@part2@p=4/q=4 +#### A masked pattern was here #### +POSTHOOK: query: select p from part2 where p in (select max(key) from foo) +union all +select p from part1 where p in (select max(key) from foo union all select min(key) from foo) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@foo +POSTHOOK: Input: default@part1 +POSTHOOK: Input: default@part1@p=1/q=1 +POSTHOOK: Input: default@part1@p=1/q=2 +POSTHOOK: Input: default@part1@p=2/q=1 +POSTHOOK: Input: default@part1@p=2/q=2 +POSTHOOK: Input: default@part2 +POSTHOOK: Input: default@part2@p=3/q=3 +POSTHOOK: Input: default@part2@p=3/q=4 +POSTHOOK: Input: default@part2@p=4/q=3 +POSTHOOK: Input: default@part2@p=4/q=4 +#### A masked pattern was here #### +1 +1 +1 +1 +2 +2 +2 +2 +PREHOOK: query: drop table foo +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@foo +PREHOOK: Output: default@foo +POSTHOOK: query: drop table foo +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@foo +POSTHOOK: Output: default@foo PREHOOK: query: drop table part1 PREHOOK: type: DROPTABLE PREHOOK: Input: default@part1