diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java index 312a02c..8391ebb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java @@ -58,6 +58,8 @@ import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.shims.ShimLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This transformation does optimization for enforcing bucketing and sorting. @@ -120,6 +122,7 @@ private NodeProcessor getBucketSortReduceSinkProc(ParseContext pctx) { * */ public class BucketSortReduceSinkProcessor implements NodeProcessor { + private final Logger LOG = LoggerFactory.getLogger(BucketSortReduceSinkProcessor.class); protected ParseContext pGraphContext; public BucketSortReduceSinkProcessor(ParseContext pGraphContext) { @@ -364,6 +367,14 @@ else if (tableTag != columnTableMappings[colNumber]) { public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + // We should not use this optimization if sorted dynamic partition optimizer is used, + // as RS will be required. + if (pGraphContext.isReduceSinkAddedBySortedDynPartition()) { + LOG.info("Reduce Sink is added by Sorted Dynamic Partition Optimizer. Bailing out of" + + " Bucketing Sorting Reduce Sink Optimizer"); + return null; + } + // If the reduce sink has not been introduced due to bucketing/sorting, ignore it FileSinkOperator fsOp = (FileSinkOperator) nd; ReduceSinkOperator rsOp = (ReduceSinkOperator) fsOp.getParentOperators().get(0).getParentOperators().get(0); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java index 7ec068c..b82d610 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java @@ -107,12 +107,20 @@ public void initialize(HiveConf hiveConf) { transformations.add(new SyntheticJoinPredicate()); transformations.add(new SimplePredicatePushDown()); } + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) { // We run constant propagation twice because after predicate pushdown, filter expressions // are combined and may become eligible for reduction (like is not null filter). transformations.add(new ConstantPropagate()); } + if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONING) && + HiveConf.getVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equals("nonstrict") && + HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITION) && + !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) { + transformations.add(new SortedDynPartitionOptimizer()); + } + if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) { transformations.add(new PartitionPruner()); transformations.add(new PartitionConditionRemover()); @@ -181,12 +189,6 @@ public void initialize(HiveConf hiveConf) { transformations.add(new FixedBucketPruningOptimizer()); } - if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONING) && - HiveConf.getVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equals("nonstrict") && - HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITION) && - !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) { - transformations.add(new SortedDynPartitionOptimizer()); - } if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) { transformations.add(new ReduceSinkDeDuplication()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index d4be5cf..62d10ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -143,6 +143,15 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, return null; } + // unlink connection between FS and its parent + Operator fsParent = fsOp.getParentOperators().get(0); + // if all dp columns got constant folded then disable this optimization + if (allStaticPartitions(fsParent, fsOp.getConf().getDynPartCtx())) { + LOG.debug("Bailing out of sorted dynamic partition optimizer as all dynamic partition" + + " columns got constant folded (static partitioning)"); + return null; + } + // if RS is inserted by enforce bucketing or sorting, we need to remove it // since ReduceSinkDeDuplication will not merge them to single RS. // RS inserted by enforce bucketing/sorting will have bucketing column in @@ -158,7 +167,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } // unlink connection between FS and its parent - Operator fsParent = fsOp.getParentOperators().get(0); + fsParent = fsOp.getParentOperators().get(0); fsParent.getChildOperators().clear(); DynamicPartitionCtx dpCtx = fsOp.getConf().getDynPartCtx(); @@ -261,9 +270,39 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, LOG.info("Inserted " + rsOp.getOperatorId() + " and " + selOp.getOperatorId() + " as parent of " + fsOp.getOperatorId() + " and child of " + fsParent.getOperatorId()); + + parseCtx.setReduceSinkAddedBySortedDynPartition(true); return null; } + private boolean allStaticPartitions(Operator op, + final DynamicPartitionCtx dynPartCtx) { + int numDpCols = dynPartCtx.getNumDPCols(); + int numCols = op.getSchema().getColumnNames().size(); + List dpCols = op.getSchema().getColumnNames().subList(numCols - numDpCols, numCols); + boolean allConstants = true; + if (op.getColumnExprMap() == null) { + // find first operator upstream with valid (non-null) column expression map + for(Operator parent : op.getParentOperators()) { + if (parent.getColumnExprMap() != null) { + op = parent; + break; + } + } + } + if (op.getColumnExprMap() != null) { + for(String dpCol : dpCols) { + ExprNodeDesc end = op.getColumnExprMap().get(dpCol); + if (!(end instanceof ExprNodeConstantDesc)) { + allConstants = false; + } + } + } else { + return false; + } + return allConstants; + } + // Remove RS and SEL introduced by enforce bucketing/sorting config // Convert PARENT -> RS -> SEL -> FS to PARENT -> FS private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index bee0175..642c227 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -105,6 +105,7 @@ private AnalyzeRewriteContext analyzeRewrite; private CreateTableDesc createTableDesc; + private boolean reduceSinkAddedBySortedDynPartition; public ParseContext() { @@ -530,4 +531,12 @@ public void setCreateTable(CreateTableDesc createTableDesc) { this.createTableDesc = createTableDesc; } + public void setReduceSinkAddedBySortedDynPartition( + final boolean reduceSinkAddedBySortedDynPartition) { + this.reduceSinkAddedBySortedDynPartition = reduceSinkAddedBySortedDynPartition; + } + + public boolean isReduceSinkAddedBySortedDynPartition() { + return reduceSinkAddedBySortedDynPartition; + } } diff --git a/ql/src/test/results/clientpositive/dynpart_sort_optimization2.q.out b/ql/src/test/results/clientpositive/dynpart_sort_optimization2.q.out index 0100eea..3b24a2e 100644 --- a/ql/src/test/results/clientpositive/dynpart_sort_optimization2.q.out +++ b/ql/src/test/results/clientpositive/dynpart_sort_optimization2.q.out @@ -1642,9 +1642,8 @@ group by "day", key POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-2 depends on stages: Stage-1 - Stage-0 depends on stages: Stage-2 - Stage-3 depends on stages: Stage-0 + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 STAGE PLANS: Stage: Stage-1 @@ -1682,34 +1681,12 @@ STAGE PLANS: Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe - - Stage: Stage-2 - Map Reduce - Map Operator Tree: - TableScan - Reduce Output Operator - key expressions: _col2 (type: string) - sort order: + - Map-reduce partition columns: _col2 (type: string) Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: string) - Reduce Operator Tree: - Select Operator - expressions: VALUE._col0 (type: int), VALUE._col1 (type: int), VALUE._col2 (type: string) - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat - output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat - serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde - name: default.hive13_dp1 + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.hive13_dp1 Stage: Stage-0 Move Operator @@ -1723,7 +1700,7 @@ STAGE PLANS: serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde name: default.hive13_dp1 - Stage: Stage-3 + Stage: Stage-2 Stats-Aggr Operator PREHOOK: query: insert overwrite table `hive13_dp1` partition(`day`) diff --git a/ql/src/test/results/clientpositive/llap/dynpart_sort_optimization2.q.out b/ql/src/test/results/clientpositive/llap/dynpart_sort_optimization2.q.out index 34c2307..06e94c3 100644 --- a/ql/src/test/results/clientpositive/llap/dynpart_sort_optimization2.q.out +++ b/ql/src/test/results/clientpositive/llap/dynpart_sort_optimization2.q.out @@ -200,7 +200,7 @@ Database: default Table: ss_part #### A masked pattern was here #### Partition Parameters: - COLUMN_STATS_ACCURATE true + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 numRows 11 rawDataSize 151 @@ -260,7 +260,7 @@ Database: default Table: ss_part #### A masked pattern was here #### Partition Parameters: - COLUMN_STATS_ACCURATE true + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 numRows 13 rawDataSize 186 @@ -429,7 +429,7 @@ Database: default Table: ss_part #### A masked pattern was here #### Partition Parameters: - COLUMN_STATS_ACCURATE true + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 numRows 11 rawDataSize 151 @@ -489,7 +489,7 @@ Database: default Table: ss_part #### A masked pattern was here #### Partition Parameters: - COLUMN_STATS_ACCURATE true + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 numRows 13 rawDataSize 186 @@ -679,7 +679,7 @@ Database: default Table: ss_part #### A masked pattern was here #### Partition Parameters: - COLUMN_STATS_ACCURATE true + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 numRows 11 rawDataSize 151 @@ -739,7 +739,7 @@ Database: default Table: ss_part #### A masked pattern was here #### Partition Parameters: - COLUMN_STATS_ACCURATE true + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 numRows 13 rawDataSize 186 @@ -907,7 +907,7 @@ Database: default Table: ss_part #### A masked pattern was here #### Partition Parameters: - COLUMN_STATS_ACCURATE true + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 numRows 11 rawDataSize 151 @@ -967,7 +967,7 @@ Database: default Table: ss_part #### A masked pattern was here #### Partition Parameters: - COLUMN_STATS_ACCURATE true + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 numRows 13 rawDataSize 186 @@ -1212,11 +1212,11 @@ Database: default Table: ss_part_orc #### A masked pattern was here #### Partition Parameters: - COLUMN_STATS_ACCURATE true + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 numRows 11 rawDataSize 88 - totalSize 433 + totalSize 454 #### A masked pattern was here #### # Storage Information @@ -1272,11 +1272,11 @@ Database: default Table: ss_part_orc #### A masked pattern was here #### Partition Parameters: - COLUMN_STATS_ACCURATE true + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 numRows 13 rawDataSize 104 - totalSize 456 + totalSize 477 #### A masked pattern was here #### # Storage Information @@ -1440,11 +1440,11 @@ Database: default Table: ss_part_orc #### A masked pattern was here #### Partition Parameters: - COLUMN_STATS_ACCURATE true + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 numRows 11 rawDataSize 88 - totalSize 433 + totalSize 454 #### A masked pattern was here #### # Storage Information @@ -1500,11 +1500,11 @@ Database: default Table: ss_part_orc #### A masked pattern was here #### Partition Parameters: - COLUMN_STATS_ACCURATE true + COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 1 numRows 13 rawDataSize 104 - totalSize 456 + totalSize 477 #### A masked pattern was here #### # Storage Information @@ -1623,9 +1623,9 @@ STAGE PLANS: outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) + key expressions: 'day' (type: string), _col1 (type: string) sort order: ++ - Map-reduce partition columns: _col0 (type: string), _col1 (type: string) + Map-reduce partition columns: 'day' (type: string), _col1 (type: string) Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE value expressions: _col2 (type: bigint) Execution mode: llap @@ -1634,12 +1634,12 @@ STAGE PLANS: Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) - keys: KEY._col0 (type: string), KEY._col1 (type: string) + keys: 'day' (type: string), KEY._col1 (type: string) mode: mergepartial outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE Select Operator - expressions: UDFToInteger(_col1) (type: int), UDFToInteger(_col2) (type: int), _col0 (type: string) + expressions: UDFToInteger(_col1) (type: int), UDFToInteger(_col2) (type: int), 'day' (type: string) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE File Output Operator @@ -1751,9 +1751,9 @@ STAGE PLANS: outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator - key expressions: _col0 (type: string), _col1 (type: string) + key expressions: 'day' (type: string), _col1 (type: string) sort order: ++ - Map-reduce partition columns: _col0 (type: string) + Map-reduce partition columns: 'day' (type: string), _col1 (type: string) Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE value expressions: _col2 (type: bigint) Execution mode: llap @@ -1762,12 +1762,12 @@ STAGE PLANS: Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) - keys: KEY._col0 (type: string), KEY._col1 (type: string) + keys: 'day' (type: string), KEY._col1 (type: string) mode: mergepartial outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE Select Operator - expressions: UDFToInteger(_col1) (type: int), UDFToInteger(_col2) (type: int), _col0 (type: string) + expressions: UDFToInteger(_col1) (type: int), UDFToInteger(_col2) (type: int), 'day' (type: string) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE File Output Operator diff --git a/ql/src/test/results/clientpositive/tez/dynpart_sort_optimization2.q.out b/ql/src/test/results/clientpositive/tez/dynpart_sort_optimization2.q.out index 1057110..bfa0a7b 100644 --- a/ql/src/test/results/clientpositive/tez/dynpart_sort_optimization2.q.out +++ b/ql/src/test/results/clientpositive/tez/dynpart_sort_optimization2.q.out @@ -1724,7 +1724,6 @@ STAGE PLANS: Tez Edges: Reducer 2 <- Map 1 (SIMPLE_EDGE) - Reducer 3 <- Reducer 2 (SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -1761,27 +1760,14 @@ STAGE PLANS: expressions: UDFToInteger(_col1) (type: int), UDFToInteger(_col2) (type: int), 'day' (type: string) outputColumnNames: _col0, _col1, _col2 Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col2 (type: string) - sort order: + - Map-reduce partition columns: _col2 (type: string) + File Output Operator + compressed: false Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - value expressions: _col0 (type: int), _col1 (type: int), _col2 (type: string) - Reducer 3 - Execution mode: vectorized - Reduce Operator Tree: - Select Operator - expressions: VALUE._col0 (type: int), VALUE._col1 (type: int), VALUE._col2 (type: string) - outputColumnNames: _col0, _col1, _col2 - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat - output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat - serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde - name: default.hive13_dp1 + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.hive13_dp1 Stage: Stage-2 Dependency Collection