diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java index 36b7036..febd446 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils; -import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -193,8 +192,16 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, sortPositions = Arrays.asList(0); sortOrder = Arrays.asList(1); // 1 means asc, could really use enum here in the thrift if } else { - sortPositions = getSortPositions(destTable.getSortCols(), destTable.getCols()); - sortOrder = getSortOrders(destTable.getSortCols(), destTable.getCols()); + if (!destTable.getSortCols().isEmpty()) { + // Sort columns specified by table + sortPositions = getSortPositions(destTable.getSortCols(), destTable.getCols()); + sortOrder = getSortOrders(destTable.getSortCols(), destTable.getCols()); + } else { + // Infer sort columns from operator tree + sortPositions = Lists.newArrayList(); + sortOrder = Lists.newArrayList(); + inferSortPositions(fsParent, sortPositions, sortOrder); + } } List sortNullOrder = new ArrayList(); for (int order : sortOrder) { @@ -380,6 +387,41 @@ private boolean removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) { return posns; } + // Try to infer possible sort columns in the query + // i.e. the sequence must be pRS-SEL*-fsParent + // Returns true if columns could be inferred, false otherwise + private void inferSortPositions(Operator fsParent, + List sortPositions, List sortOrder) throws SemanticException { + // If it is not a SEL operator, we bail out + if (!(fsParent instanceof SelectOperator)) { + return; + } + SelectOperator pSel = (SelectOperator) fsParent; + Operator parent = pSel; + while (!(parent instanceof ReduceSinkOperator)) { + if (parent.getNumParent() != 1 || + !(parent instanceof SelectOperator)) { + return; + } + parent = parent.getParentOperators().get(0); + } + // Backtrack SEL columns to pRS + List selColsInPRS = + ExprNodeDescUtils.backtrack(pSel.getConf().getColList(), pSel, parent); + ReduceSinkOperator pRS = (ReduceSinkOperator) parent; + for (int i = 0; i < pRS.getConf().getKeyCols().size(); i++) { + ExprNodeDesc col = pRS.getConf().getKeyCols().get(i); + int pos = selColsInPRS.indexOf(col); + if (pos == -1) { + sortPositions.clear(); + sortOrder.clear(); + return; + } + sortPositions.add(pos); + sortOrder.add(pRS.getConf().getOrder().charAt(i) == '+' ? 1 : 0); // 1 asc, 0 desc + } + } + public ReduceSinkOperator getReduceSinkOp(List partitionPositions, List sortPositions, List sortOrder, List sortNullOrder, ArrayList allCols, ArrayList bucketColumns, int numBuckets, diff --git ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid2.q ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid2.q new file mode 100644 index 0000000..c115e62 --- /dev/null +++ ql/src/test/queries/clientpositive/dynpart_sort_optimization_acid2.q @@ -0,0 +1,12 @@ +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.optimize.sort.dynamic.partition=true; + +CREATE TABLE non_acid(key string, value string) +PARTITIONED BY(ds string, hr int) +CLUSTERED BY(key) INTO 2 BUCKETS +STORED AS ORC; + +explain +insert into table non_acid partition(ds,hr) select * from srcpart sort by value; diff --git ql/src/test/results/clientpositive/dynpart_sort_optimization_acid2.q.out ql/src/test/results/clientpositive/dynpart_sort_optimization_acid2.q.out new file mode 100644 index 0000000..0b6e992 --- /dev/null +++ ql/src/test/results/clientpositive/dynpart_sort_optimization_acid2.q.out @@ -0,0 +1,72 @@ +PREHOOK: query: CREATE TABLE non_acid(key string, value string) +PARTITIONED BY(ds string, hr int) +CLUSTERED BY(key) INTO 2 BUCKETS +STORED AS ORC +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@non_acid +POSTHOOK: query: CREATE TABLE non_acid(key string, value string) +PARTITIONED BY(ds string, hr int) +CLUSTERED BY(key) INTO 2 BUCKETS +STORED AS ORC +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@non_acid +PREHOOK: query: explain +insert into table non_acid partition(ds,hr) select * from srcpart sort by value +PREHOOK: type: QUERY +POSTHOOK: query: explain +insert into table non_acid partition(ds,hr) select * from srcpart sort by value +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-0 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: srcpart + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string), ds (type: string), hr (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col2 (type: string), _col3 (type: string), '_bucket_number' (type: string), _col1 (type: string) + sort order: ++++ + Map-reduce partition columns: _col2 (type: string), _col3 (type: string) + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: string), KEY._col1 (type: string), KEY._col2 (type: string), KEY._col3 (type: string), KEY.'_bucket_number' (type: string) + outputColumnNames: _col0, _col1, _col2, _col3, '_bucket_number' + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.non_acid + + Stage: Stage-0 + Move Operator + tables: + partition: + ds + hr + replace: false + table: + input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat + serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde + name: default.non_acid + + Stage: Stage-2 + Stats-Aggr Operator +