diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index d42b643..8a585fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -637,12 +637,15 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo // we're removing the reduce sink we need do remove that too. Set> dynamicPartitionOperators = new HashSet>(); for (Operator c : p.getChildOperators()) { - if (hasDynamicPartitionBroadcast(c)) { + if (hasDynamicPartitionBroadcast(c, context)) { dynamicPartitionOperators.add(c); } } for (Operator c : dynamicPartitionOperators) { - p.removeChild(c); + if (context.pruningOpsRemovedByRemoveDynamicPruningBySize.isEmpty() || + !context.pruningOpsRemovedByRemoveDynamicPruningBySize.contains(c)) { + p.removeChild(c); + } } } mapJoinOp.getParentOperators().remove(bigTablePosition); @@ -662,7 +665,7 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo return mapJoinOp; } - private boolean hasDynamicPartitionBroadcast(Operator parent) { + private boolean hasDynamicPartitionBroadcast(Operator parent, OptimizeTezProcContext context) { boolean hasDynamicPartitionPruning = false; for (Operator op : parent.getChildOperators()) { @@ -670,6 +673,7 @@ private boolean hasDynamicPartitionBroadcast(Operator parent) { if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) { // found dynamic partition pruning operator hasDynamicPartitionPruning = true; + context.pruningOpsRemovedByMapJoinConversion.add((AppMasterEventOperator)op); break; } if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java index 4803959..50af0b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/RemoveDynamicPruningBySize.java @@ -52,9 +52,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procContext, AppMasterEventDesc desc = event.getConf(); if (desc.getStatistics().getDataSize() > context.conf - .getLongVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) { + .getLongVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE) && + (context.pruningOpsRemovedByMapJoinConversion.isEmpty() || + !context.pruningOpsRemovedByMapJoinConversion.contains(event))) { Operator child = event; Operator curr = event; + context.pruningOpsRemovedByRemoveDynamicPruningBySize.add(event); while (curr.getChildOperators().size() <= 1) { child = curr; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java index ee71971..ed4003e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java @@ -49,6 +49,15 @@ public final Set inputs; public final Set outputs; + /* Two of the optimization rules, ConvertJoinMapJoin and RemoveDynamicPruningBySize, are put into + stats dependent optimizations and run together in TezCompiler. As ConvertJoinMapJoin is + removing the reduce sink, it may also have removed a dynamic partition pruning operator chain. + However, RemoveDynamicPruningBySize doesn't know this and still tries to traverse that removed + chain. To avoid that, we need to remember the leaf node(s) of that chain so it can be skipped. + */ + public HashSet pruningOpsRemovedByMapJoinConversion; + public HashSet pruningOpsRemovedByRemoveDynamicPruningBySize; + public final Set visitedReduceSinks = new HashSet(); @@ -66,6 +75,8 @@ public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext, Set(); + this.pruningOpsRemovedByRemoveDynamicPruningBySize = new HashSet(); } public void setRootOperators(Deque> roots) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index ea12990..ff80bfe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -149,7 +149,7 @@ private void runCycleAnalysisForPartitionPruning(OptimizeTezProcContext procCtx, if (component.size() != 1) { LOG.info("Found cycle in operator plan..."); cycleFree = false; - removeEventOperator(component); + removeEventOperator(component, procCtx); break; } } @@ -157,7 +157,7 @@ private void runCycleAnalysisForPartitionPruning(OptimizeTezProcContext procCtx, } } - private void removeEventOperator(Set> component) { + private void removeEventOperator(Set> component, OptimizeTezProcContext context) { AppMasterEventOperator victim = null; for (Operator o : component) { if (o instanceof AppMasterEventOperator) { @@ -169,6 +169,17 @@ private void removeEventOperator(Set> component) { } } + if (victim == null) { + return; + } + + if ((!context.pruningOpsRemovedByMapJoinConversion.isEmpty() && + context.pruningOpsRemovedByMapJoinConversion.contains(victim)) || + (!context.pruningOpsRemovedByRemoveDynamicPruningBySize.isEmpty() && + context.pruningOpsRemovedByRemoveDynamicPruningBySize.contains(victim))) { + return; + } + Operator child = victim; Operator curr = victim; diff --git a/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q b/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q index 5a7f113..a4e84b1 100644 --- a/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q +++ b/ql/src/test/queries/clientpositive/dynamic_partition_pruning_2.q @@ -118,3 +118,20 @@ SELECT amount FROM agg_01, dim_shops WHERE dim_shops_id = id AND label = 'bar'; SELECT amount FROM agg_01, dim_shops WHERE dim_shops_id = id AND label = 'foo' UNION ALL SELECT amount FROM agg_01, dim_shops WHERE dim_shops_id = id AND label = 'bar'; + +set hive.tez.dynamic.partition.pruning.max.event.size=1000000; +set hive.tez.dynamic.partition.pruning.max.data.size=10000; +-- Dynamic partition pruning will be removed as data size exceeds the limit; +-- and for self join on partitioning column, it should not fail (HIVE-10559). +explain +select count(*) +from srcpart s1, + srcpart s2 +where s1.ds = s2.ds +; + +select count(*) +from srcpart s1, + srcpart s2 +where s1.ds = s2.ds +; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out b/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out index 8c8531c..8b0b81d 100644 --- a/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out +++ b/ql/src/test/results/clientpositive/tez/dynamic_partition_pruning_2.q.out @@ -963,3 +963,119 @@ POSTHOOK: Input: default@dim_shops 4 5 6 +PREHOOK: query: -- Dynamic partition pruning will be removed as data size exceeds the limit; +-- and for self join on partitioning column, it should not fail (HIVE-10559). +explain +select count(*) +from srcpart s1, + srcpart s2 +where s1.ds = s2.ds +PREHOOK: type: QUERY +POSTHOOK: query: -- Dynamic partition pruning will be removed as data size exceeds the limit; +-- and for self join on partitioning column, it should not fail (HIVE-10559). +explain +select count(*) +from srcpart s1, + srcpart s2 +where s1.ds = s2.ds +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Map 1 <- Map 3 (BROADCAST_EDGE) + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: s1 + filterExpr: ds is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: ds (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: string) + 1 _col0 (type: string) + input vertices: + 1 Map 3 + Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE + HybridGraceHashJoin: true + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Map 3 + Map Operator Tree: + TableScan + alias: s1 + filterExpr: ds is not null (type: boolean) + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: ds (type: string) + outputColumnNames: _col0 + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) +from srcpart s1, + srcpart s2 +where s1.ds = s2.ds +PREHOOK: type: QUERY +PREHOOK: Input: default@srcpart +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) +from srcpart s1, + srcpart s2 +where s1.ds = s2.ds +POSTHOOK: type: QUERY +POSTHOOK: Input: default@srcpart +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 +POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 +#### A masked pattern was here #### +2000000